use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use sha2::{Digest, Sha256};
use crate::mcp::{call_mcp_tool_with_hint, VmMcpClientHandle};
use crate::mcp_protocol::McpCacheHint;
use crate::mcp_registry::{self, RegisteredMcpServer};
use crate::value::VmError;
pub const DEFAULT_MAX_RESTARTS: u32 = 5;
pub const DEFAULT_RESTART_WINDOW: Duration = Duration::from_mins(5);
pub const DEFAULT_CIRCUIT_THRESHOLD: u32 = 5;
pub const DEFAULT_CIRCUIT_RESET: Duration = Duration::from_secs(30);
pub const INITIAL_RESTART_BACKOFF: Duration = Duration::from_millis(100);
pub const MAX_RESTART_BACKOFF: Duration = Duration::from_secs(5);
pub const RESPONSE_CACHE_MAX_ENTRIES_PER_TOOL: usize = 64;
#[derive(Debug)]
struct SupervisionState {
restart_attempts: Vec<Instant>,
consecutive_failures: u32,
breaker_opens_until: Option<Instant>,
ejected: bool,
circuit_threshold: u32,
circuit_reset: Duration,
max_restarts: u32,
restart_window: Duration,
}
impl SupervisionState {
fn new(policy: SupervisionPolicy) -> Self {
Self {
restart_attempts: Vec::new(),
consecutive_failures: 0,
breaker_opens_until: None,
ejected: false,
circuit_threshold: policy.circuit_threshold,
circuit_reset: policy.circuit_reset,
max_restarts: policy.max_restarts,
restart_window: policy.restart_window,
}
}
fn breaker_state(&mut self, now: Instant) -> BreakerState {
match self.breaker_opens_until {
Some(deadline) if now < deadline => BreakerState::Open,
Some(_) => BreakerState::HalfOpen,
None => BreakerState::Closed,
}
}
fn record_success(&mut self) {
self.consecutive_failures = 0;
self.breaker_opens_until = None;
}
fn record_failure(&mut self, now: Instant) {
self.consecutive_failures = self.consecutive_failures.saturating_add(1);
if self.consecutive_failures >= self.circuit_threshold {
self.breaker_opens_until = Some(now + self.circuit_reset);
}
}
fn record_restart(&mut self, now: Instant) -> bool {
self.prune_restart_window(now);
self.restart_attempts.push(now);
if self.restart_attempts.len() as u32 > self.max_restarts {
self.ejected = true;
return false;
}
true
}
fn backoff_delay(&self) -> Duration {
let attempt = self.restart_attempts.len() as u32;
let exp = attempt.saturating_sub(1).min(6);
let mul = 1u64 << exp;
let nanos = INITIAL_RESTART_BACKOFF.as_nanos() as u64 * mul;
Duration::from_nanos(nanos).min(MAX_RESTART_BACKOFF)
}
fn prune_restart_window(&mut self, now: Instant) {
let window = self.restart_window;
self.restart_attempts
.retain(|t| now.duration_since(*t) <= window);
}
fn clear(&mut self) {
self.restart_attempts.clear();
self.consecutive_failures = 0;
self.breaker_opens_until = None;
self.ejected = false;
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum BreakerState {
Closed,
Open,
HalfOpen,
}
impl BreakerState {
pub fn as_str(self) -> &'static str {
match self {
BreakerState::Closed => "closed",
BreakerState::Open => "open",
BreakerState::HalfOpen => "half_open",
}
}
}
#[derive(Clone, Copy, Debug)]
pub struct SupervisionPolicy {
pub circuit_threshold: u32,
pub circuit_reset: Duration,
pub max_restarts: u32,
pub restart_window: Duration,
}
impl Default for SupervisionPolicy {
fn default() -> Self {
Self {
circuit_threshold: DEFAULT_CIRCUIT_THRESHOLD,
circuit_reset: DEFAULT_CIRCUIT_RESET,
max_restarts: DEFAULT_MAX_RESTARTS,
restart_window: DEFAULT_RESTART_WINDOW,
}
}
}
#[derive(Clone, Debug)]
struct CachedResponse {
payload: JsonValue,
inserted_at: Instant,
expires_at: Instant,
#[allow(dead_code)]
scope: Option<&'static str>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AllowlistDecision {
Allow,
Deny { reason: String },
}
pub type AllowlistGuard = Arc<dyn Fn(&str, Option<&str>) -> AllowlistDecision + Send + Sync>;
#[derive(Clone, Debug, Serialize)]
pub struct McpHostStatus {
pub name: String,
pub active: bool,
pub lazy: bool,
pub ref_count: usize,
pub restart_count: u32,
pub consecutive_failures: u32,
pub circuit: BreakerState,
pub ejected: bool,
pub cache_entries: usize,
}
#[derive(Clone, Debug, Default, Deserialize)]
pub struct SpawnOptions {
#[serde(default)]
pub lazy: bool,
#[serde(default)]
pub keep_alive_ms: Option<u64>,
#[serde(default)]
pub card: Option<String>,
#[serde(default)]
pub circuit_threshold: Option<u32>,
#[serde(default)]
pub circuit_reset_ms: Option<u64>,
#[serde(default)]
pub max_restarts: Option<u32>,
#[serde(default)]
pub restart_window_ms: Option<u64>,
}
impl SpawnOptions {
fn into_policy(self) -> (SupervisionPolicy, RegisteredMcpServerMeta) {
let default = SupervisionPolicy::default();
let policy = SupervisionPolicy {
circuit_threshold: self.circuit_threshold.unwrap_or(default.circuit_threshold),
circuit_reset: self
.circuit_reset_ms
.map(Duration::from_millis)
.unwrap_or(default.circuit_reset),
max_restarts: self.max_restarts.unwrap_or(default.max_restarts),
restart_window: self
.restart_window_ms
.map(Duration::from_millis)
.unwrap_or(default.restart_window),
};
let meta = RegisteredMcpServerMeta {
lazy: self.lazy,
keep_alive: self.keep_alive_ms.map(Duration::from_millis),
card: self.card,
};
(policy, meta)
}
}
struct RegisteredMcpServerMeta {
lazy: bool,
keep_alive: Option<Duration>,
card: Option<String>,
}
struct HostInner {
supervision: HashMap<String, SupervisionState>,
response_cache: HashMap<(String, String), HashMap<String, CachedResponse>>,
allowlist: Option<AllowlistGuard>,
cache_hits: u64,
cache_misses: u64,
}
impl HostInner {
fn new() -> Self {
Self {
supervision: HashMap::new(),
response_cache: HashMap::new(),
allowlist: None,
cache_hits: 0,
cache_misses: 0,
}
}
}
static HOST: Mutex<Option<HostInner>> = Mutex::new(None);
fn with_inner<F, R>(f: F) -> R
where
F: FnOnce(&mut HostInner) -> R,
{
let mut guard = HOST.lock().expect("mcp host mutex poisoned");
if guard.is_none() {
*guard = Some(HostInner::new());
}
f(guard.as_mut().expect("host inner just initialized"))
}
pub fn set_allowlist(guard: Option<AllowlistGuard>) {
with_inner(|inner| inner.allowlist = guard);
}
pub fn reset_for_tests() {
with_inner(|inner| {
inner.supervision.clear();
inner.response_cache.clear();
inner.allowlist = None;
inner.cache_hits = 0;
inner.cache_misses = 0;
});
mcp_registry::reset();
}
#[derive(Clone, Copy, Debug)]
pub struct CacheStats {
pub hits: u64,
pub misses: u64,
}
pub fn cache_stats() -> CacheStats {
with_inner(|inner| CacheStats {
hits: inner.cache_hits,
misses: inner.cache_misses,
})
}
pub async fn spawn(spec: JsonValue, options: SpawnOptions) -> Result<String, VmError> {
let name = spec
.get("name")
.and_then(|v| v.as_str())
.ok_or_else(|| VmError::Runtime("mcp.spawn: spec must include a `name` field".into()))?
.to_string();
if name.is_empty() {
return Err(VmError::Runtime(
"mcp.spawn: spec.name must be a non-empty string".into(),
));
}
if let Some(guard) = current_allowlist() {
if let AllowlistDecision::Deny { reason } = guard(&name, None) {
return Err(VmError::Runtime(format!(
"mcp.spawn({name}): denied by allowlist: {reason}"
)));
}
}
let (policy, meta) = options.into_policy();
mcp_registry::register_servers(vec![RegisteredMcpServer {
name: name.clone(),
spec: spec.clone(),
lazy: meta.lazy,
card: meta.card,
keep_alive: meta.keep_alive,
}]);
with_inner(|inner| {
inner
.supervision
.insert(name.clone(), SupervisionState::new(policy));
});
if !meta.lazy {
let _ = mcp_registry::ensure_active(&name).await.inspect_err(|_| {
with_inner(|inner| {
inner.supervision.remove(&name);
});
})?;
}
Ok(name)
}
pub fn stop(name: &str) -> Result<(), VmError> {
if !mcp_registry::is_registered(name) {
return Err(VmError::Runtime(format!(
"mcp.stop: no server named '{name}' is hosted"
)));
}
mcp_registry::release(name);
with_inner(|inner| {
inner.supervision.remove(name);
inner.response_cache.retain(|(s, _), _| s != name);
});
Ok(())
}
pub fn reload(name: &str) -> Result<(), VmError> {
if !mcp_registry::is_registered(name) {
return Err(VmError::Runtime(format!(
"mcp.reload: no server named '{name}' is hosted"
)));
}
mcp_registry::release(name);
with_inner(|inner| {
if let Some(state) = inner.supervision.get_mut(name) {
state.clear();
}
inner.response_cache.retain(|(s, _), _| s != name);
});
Ok(())
}
pub async fn tools(name: &str) -> Result<Vec<JsonValue>, VmError> {
let handle = ensure_or_restart(name).await?;
let result = supervised_call(name, || async {
handle.call("tools/list", serde_json::json!({})).await
})
.await?;
let mut tools = result
.get("tools")
.and_then(|t| t.as_array())
.cloned()
.unwrap_or_default();
for tool in tools.iter_mut() {
if let Some(obj) = tool.as_object_mut() {
obj.entry("_mcp_server")
.or_insert_with(|| JsonValue::String(name.to_string()));
}
}
Ok(tools)
}
pub async fn call(name: &str, tool: &str, args: JsonValue) -> Result<JsonValue, VmError> {
if let Some(guard) = current_allowlist() {
if let AllowlistDecision::Deny { reason } = guard(name, Some(tool)) {
return Err(VmError::Runtime(format!(
"mcp.call({name}/{tool}): denied by allowlist: {reason}"
)));
}
}
crate::call_budget::charge_mcp_call()?;
let now = Instant::now();
let args_hash = hash_args(&args);
if let Some(payload) = take_cache_hit(name, tool, &args_hash, now) {
return Ok(payload);
}
with_inner(|inner| inner.cache_misses = inner.cache_misses.saturating_add(1));
breaker_gate(name, now)?;
let handle = ensure_or_restart(name).await?;
let envelope_hint: Arc<Mutex<Option<McpCacheHint>>> = Arc::new(Mutex::new(None));
let hint_slot = Arc::clone(&envelope_hint);
let result = supervised_call(name, move || {
let handle = handle.clone();
let tool = tool.to_string();
let args = args.clone();
let hint_slot = Arc::clone(&hint_slot);
async move {
let (content, hint) = call_mcp_tool_with_hint(&handle, &tool, args).await?;
if let Ok(mut slot) = hint_slot.lock() {
*slot = hint;
}
Ok(content)
}
})
.await?;
let hint = envelope_hint.lock().ok().and_then(|slot| *slot);
if let Some(hint) = hint {
insert_cache(name, tool, &args_hash, &result, hint, now);
}
Ok(result)
}
pub async fn discover() -> Result<Vec<JsonValue>, VmError> {
let names: Vec<String> = mcp_registry::snapshot_status()
.into_iter()
.map(|s| s.name)
.collect();
let mut out: Vec<JsonValue> = Vec::new();
for name in names {
if let Some(guard) = current_allowlist() {
if matches!(guard(&name, None), AllowlistDecision::Deny { .. }) {
continue;
}
}
match tools(&name).await {
Ok(tools) => {
for tool in tools {
let tool_name = tool
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
out.push(serde_json::json!({
"server": name,
"tool": tool_name,
"schema": tool,
}));
}
}
Err(err) => {
out.push(serde_json::json!({
"server": name,
"error": err.to_string(),
}));
}
}
}
Ok(out)
}
pub fn status() -> Vec<McpHostStatus> {
let registry: BTreeMap<String, mcp_registry::RegistryStatus> = mcp_registry::snapshot_status()
.into_iter()
.map(|s| (s.name.clone(), s))
.collect();
with_inner(|inner| {
let mut out = Vec::new();
let now = Instant::now();
for (name, reg) in ®istry {
let (restart_count, consecutive_failures, circuit, ejected) =
if let Some(state) = inner.supervision.get_mut(name) {
let st = state.breaker_state(now);
(
state.restart_attempts.len() as u32,
state.consecutive_failures,
st,
state.ejected,
)
} else {
(0, 0, BreakerState::Closed, false)
};
let cache_entries = inner
.response_cache
.iter()
.filter(|((s, _), _)| s == name)
.map(|(_, v)| v.len())
.sum();
out.push(McpHostStatus {
name: name.clone(),
active: reg.active,
lazy: reg.lazy,
ref_count: reg.ref_count,
restart_count,
consecutive_failures,
circuit,
ejected,
cache_entries,
});
}
out
})
}
fn current_allowlist() -> Option<AllowlistGuard> {
with_inner(|inner| inner.allowlist.clone())
}
fn breaker_gate(name: &str, now: Instant) -> Result<(), VmError> {
with_inner(|inner| {
let Some(state) = inner.supervision.get_mut(name) else {
return Ok(());
};
if state.ejected {
return Err(VmError::Runtime(format!(
"mcp.call({name}): server is ejected after exhausting its restart budget; call `harn.mcp.reload({name:?})` to clear"
)));
}
match state.breaker_state(now) {
BreakerState::Open => Err(VmError::Runtime(format!(
"mcp.call({name}): circuit breaker is open (last {n} consecutive failures); retry after the breaker resets",
n = state.consecutive_failures
))),
BreakerState::Closed | BreakerState::HalfOpen => Ok(()),
}
})
}
async fn ensure_or_restart(name: &str) -> Result<VmMcpClientHandle, VmError> {
if let Some(handle) = mcp_registry::active_handle(name) {
return Ok(handle);
}
mcp_registry::ensure_active(name).await
}
async fn supervised_call<F, Fut>(name: &str, op: F) -> Result<JsonValue, VmError>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<JsonValue, VmError>>,
{
let span = tracing::info_span!(
"harn.mcp.call",
otel.name = "harn.mcp.call",
harn.mcp.server = name,
);
let _enter = span.enter();
let first = op().await;
match first {
Ok(v) => {
with_inner(|inner| {
if let Some(state) = inner.supervision.get_mut(name) {
state.record_success();
}
});
Ok(v)
}
Err(err) => {
let now = Instant::now();
let (should_retry, backoff) = with_inner(|inner| {
let Some(state) = inner.supervision.get_mut(name) else {
return (false, Duration::ZERO);
};
state.record_failure(now);
if !looks_like_transport_failure(&err) {
return (false, Duration::ZERO);
}
let ok = state.record_restart(now);
if !ok {
return (false, Duration::ZERO);
}
(true, state.backoff_delay())
});
if !should_retry {
tracing::warn!(
server = name,
error = %err,
"harn.mcp.call: failure (no retry)"
);
return Err(err);
}
tracing::info!(
server = name,
error = %err,
backoff_ms = backoff.as_millis() as u64,
"harn.mcp.call: retrying after transport failure"
);
mcp_registry::release(name);
tokio::time::sleep(backoff).await;
let _handle = ensure_or_restart(name).await?;
let second = op().await;
match &second {
Ok(_) => with_inner(|inner| {
if let Some(state) = inner.supervision.get_mut(name) {
state.record_success();
}
}),
Err(err) => with_inner(|inner| {
if let Some(state) = inner.supervision.get_mut(name) {
state.record_failure(Instant::now());
}
tracing::warn!(
server = name,
error = %err,
"harn.mcp.call: second attempt failed"
);
}),
}
second
}
}
}
fn looks_like_transport_failure(err: &VmError) -> bool {
let text = err.to_string();
let needles = [
"server closed connection",
"disconnected",
"MCP read error",
"MCP write error",
"did not respond to",
"MCP flush error",
"connect",
];
needles.iter().any(|n| text.contains(n))
}
fn hash_args(args: &JsonValue) -> String {
let mut hasher = Sha256::new();
let canonical = canonicalize_json(args);
hasher.update(canonical.as_bytes());
let digest = hasher.finalize();
let mut hex = String::with_capacity(digest.len() * 2);
for byte in digest {
use std::fmt::Write;
let _ = write!(&mut hex, "{byte:02x}");
}
hex
}
fn canonicalize_json(value: &JsonValue) -> String {
match value {
JsonValue::Object(map) => {
let mut sorted: Vec<(&String, &JsonValue)> = map.iter().collect();
sorted.sort_by(|a, b| a.0.cmp(b.0));
let body: Vec<String> = sorted
.into_iter()
.map(|(k, v)| {
format!(
"{}:{}",
serde_json::to_string(k).unwrap_or_default(),
canonicalize_json(v)
)
})
.collect();
format!("{{{}}}", body.join(","))
}
JsonValue::Array(items) => {
let body: Vec<String> = items.iter().map(canonicalize_json).collect();
format!("[{}]", body.join(","))
}
other => serde_json::to_string(other).unwrap_or_default(),
}
}
fn take_cache_hit(server: &str, tool: &str, args_hash: &str, now: Instant) -> Option<JsonValue> {
with_inner(|inner| {
let key = (server.to_string(), tool.to_string());
let entry = inner.response_cache.get_mut(&key)?;
let cached = entry.get(args_hash)?;
if now >= cached.expires_at {
entry.remove(args_hash);
return None;
}
let payload = cached.payload.clone();
inner.cache_hits = inner.cache_hits.saturating_add(1);
Some(payload)
})
}
fn insert_cache(
server: &str,
tool: &str,
args_hash: &str,
payload: &JsonValue,
hint: McpCacheHint,
now: Instant,
) {
let Some(ttl_ms) = hint.ttl_ms else {
return;
};
if ttl_ms == 0 {
return;
}
let expires_at = now + Duration::from_millis(ttl_ms);
let cached = CachedResponse {
payload: payload.clone(),
inserted_at: now,
expires_at,
scope: hint.scope,
};
with_inner(|inner| {
let key = (server.to_string(), tool.to_string());
let bucket = inner.response_cache.entry(key).or_default();
if bucket.len() >= RESPONSE_CACHE_MAX_ENTRIES_PER_TOOL {
if let Some(oldest_key) = bucket
.iter()
.min_by_key(|(_, v)| v.inserted_at)
.map(|(k, _)| k.clone())
{
bucket.remove(&oldest_key);
}
}
bucket.insert(args_hash.to_string(), cached);
});
}
#[cfg(test)]
fn insert_cache_if_hinted(
server: &str,
tool: &str,
args_hash: &str,
payload: &JsonValue,
now: Instant,
) {
if let Some(hint) = McpCacheHint::from_result(payload) {
insert_cache(server, tool, args_hash, payload, hint, now);
}
}
#[cfg(test)]
mod tests {
use super::*;
static TEST_LOCK: Mutex<()> = Mutex::new(());
fn lock() -> std::sync::MutexGuard<'static, ()> {
TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner())
}
#[test]
fn supervision_breaker_opens_after_threshold() {
let _g = lock();
let mut state = SupervisionState::new(SupervisionPolicy {
circuit_threshold: 3,
circuit_reset: Duration::from_millis(100),
..SupervisionPolicy::default()
});
let t0 = Instant::now();
assert_eq!(state.breaker_state(t0), BreakerState::Closed);
state.record_failure(t0);
state.record_failure(t0);
assert_eq!(state.breaker_state(t0), BreakerState::Closed);
state.record_failure(t0);
assert_eq!(state.breaker_state(t0), BreakerState::Open);
assert_eq!(
state.breaker_state(t0 + Duration::from_millis(200)),
BreakerState::HalfOpen
);
}
#[test]
fn supervision_restart_budget_ejects_after_n_attempts() {
let _g = lock();
let mut state = SupervisionState::new(SupervisionPolicy {
max_restarts: 2,
restart_window: Duration::from_mins(1),
..SupervisionPolicy::default()
});
let t = Instant::now();
assert!(state.record_restart(t));
assert!(state.record_restart(t));
assert!(!state.record_restart(t));
assert!(state.ejected);
}
#[test]
fn supervision_backoff_grows_exponentially_then_caps() {
let _g = lock();
let mut state = SupervisionState::new(SupervisionPolicy::default());
let t = Instant::now();
state.record_restart(t);
let d1 = state.backoff_delay();
state.record_restart(t);
let d2 = state.backoff_delay();
state.record_restart(t);
let d3 = state.backoff_delay();
assert!(
d2 > d1,
"second backoff ({d2:?}) should exceed first ({d1:?})"
);
assert!(d3 > d2);
for _ in 0..16 {
state.record_restart(t);
}
assert!(state.backoff_delay() <= MAX_RESTART_BACKOFF);
}
#[test]
fn canonical_json_sorts_object_keys() {
let a = canonicalize_json(&serde_json::json!({"b": 1, "a": 2}));
let b = canonicalize_json(&serde_json::json!({"a": 2, "b": 1}));
assert_eq!(a, b);
}
#[test]
fn hash_args_is_stable_across_key_order() {
let h1 = hash_args(&serde_json::json!({"x": 1, "y": [1, 2]}));
let h2 = hash_args(&serde_json::json!({"y": [1, 2], "x": 1}));
assert_eq!(h1, h2);
}
#[test]
fn cache_insert_and_take_respects_ttl() {
let _g = lock();
reset_for_tests();
let payload = serde_json::json!({
"ttlMs": 100,
"cacheScope": "private",
"value": 1
});
let now = Instant::now();
insert_cache_if_hinted("srv", "ping", "deadbeef", &payload, now);
let hit = take_cache_hit("srv", "ping", "deadbeef", now);
assert!(hit.is_some(), "fresh entry should hit");
let stale = take_cache_hit("srv", "ping", "deadbeef", now + Duration::from_millis(200));
assert!(stale.is_none(), "expired entry should miss");
}
#[test]
fn cache_skips_payload_without_hint() {
let _g = lock();
reset_for_tests();
insert_cache_if_hinted(
"srv",
"ping",
"h",
&serde_json::json!({"value": 1}),
Instant::now(),
);
assert!(take_cache_hit("srv", "ping", "h", Instant::now()).is_none());
}
#[test]
fn allowlist_denies_disallowed_tool() {
let _g = lock();
reset_for_tests();
set_allowlist(Some(Arc::new(|server, tool| {
if server == "github" && tool == Some("delete_repo") {
AllowlistDecision::Deny {
reason: "destructive tool blocked".into(),
}
} else {
AllowlistDecision::Allow
}
})));
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let err = runtime
.block_on(call("github", "delete_repo", serde_json::json!({})))
.unwrap_err();
assert!(err.to_string().contains("denied by allowlist"));
set_allowlist(None);
}
#[test]
fn stop_unregistered_server_errors() {
let _g = lock();
reset_for_tests();
let err = stop("nope").unwrap_err();
assert!(err.to_string().contains("no server named 'nope'"));
}
#[test]
fn supervision_record_success_resets_counters() {
let _g = lock();
let mut state = SupervisionState::new(SupervisionPolicy::default());
let t = Instant::now();
state.record_failure(t);
state.record_failure(t);
state.record_success();
assert_eq!(state.consecutive_failures, 0);
assert!(state.breaker_opens_until.is_none());
}
#[test]
fn looks_like_transport_failure_matches_common_errors() {
let cases = [
"MCP: server closed connection",
"MCP: server did not respond to 'tools/call' within 60s",
"MCP write error: broken pipe",
"MCP client is disconnected",
];
for msg in cases {
assert!(
looks_like_transport_failure(&VmError::Runtime(msg.into())),
"expected {msg:?} to be classified as transport failure"
);
}
assert!(
!looks_like_transport_failure(&VmError::Runtime(
"tool 'foo' rejected arguments".into()
)),
"tool-level errors must not trigger an auto-restart"
);
}
}