1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
//! Process-local MCP server registry for lazy boot + skill-scoped
//! binding (harn#75).
//!
//! Holds the declared MCP server specs from `harn.toml` along with a
//! live client handle once a server has been booted. Servers marked
//! `lazy = true` stay idle until the first `mcp_ensure_active` or
//! `mcp_call` targets them.
//!
//! Ref-counting semantics:
//! - `ensure_active(name)` — connects if needed; each call increments
//! the active-binder count by 1.
//! - `release(name)` — decrements the binder count; when it reaches 0
//! AND `keep_alive_ms` has elapsed, the client is disconnected.
//! - A non-lazy connection is held "forever" (ref count pinned at 1)
//! until process exit.
//!
//! The registry lives per-process as a `Mutex<RegistryInner>` — agent
//! loops, skill activations, and the CLI's `connect_mcp_servers`
//! function all operate on the same instance.
use std::collections::BTreeMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use crate::mcp::{connect_mcp_server_from_json, VmMcpClientHandle};
use crate::value::VmError;
/// One server's registration. Mirrors `McpServerConfig` but is owned
/// by the VM side so harn-cli can hand off and forget.
#[derive(Clone, Debug)]
pub struct RegisteredMcpServer {
pub name: String,
pub spec: serde_json::Value,
pub lazy: bool,
/// Optional card source (URL or local path) from `harn.toml`.
pub card: Option<String>,
/// How long to keep the connection alive after the last release.
/// `None` → disconnect immediately at refcount 0.
pub keep_alive: Option<Duration>,
}
struct ActiveConnection {
handle: VmMcpClientHandle,
/// Number of active binders (skills holding the server open).
/// Non-lazy connections pin this to `usize::MAX / 2` so `release`
/// on them is a no-op.
ref_count: usize,
/// Timestamp of the last `release` call — used to honor
/// `keep_alive` without immediate disconnect.
last_released_at: Option<Instant>,
}
struct RegistryInner {
servers: BTreeMap<String, RegisteredMcpServer>,
active: BTreeMap<String, ActiveConnection>,
}
impl RegistryInner {
const fn new() -> Self {
Self {
servers: BTreeMap::new(),
active: BTreeMap::new(),
}
}
}
static REGISTRY: Mutex<RegistryInner> = Mutex::new(RegistryInner::new());
/// Register every MCP server declared in `harn.toml`. Idempotent —
/// re-registering a server replaces its spec and card but preserves any
/// live connection.
pub fn register_servers(servers: Vec<RegisteredMcpServer>) {
let mut guard = REGISTRY.lock().expect("mcp registry poisoned");
for server in servers {
guard.servers.insert(server.name.clone(), server);
}
}
/// Returns `true` when a server with `name` is registered (lazy or
/// eager). Used by `mcp_ensure_active` / skill activation paths to
/// produce useful "not found" errors.
pub fn is_registered(name: &str) -> bool {
REGISTRY
.lock()
.expect("mcp registry poisoned")
.servers
.contains_key(name)
}
/// Return a cloned registration record, or `None`. Used by the CLI
/// helper so manifest lookups don't need to reparse harn.toml.
pub fn get_registration(name: &str) -> Option<RegisteredMcpServer> {
REGISTRY
.lock()
.expect("mcp registry poisoned")
.servers
.get(name)
.cloned()
}
/// Drop every registration and active connection. Used by
/// `reset_thread_local_state` and tests.
pub fn reset() {
let mut guard = REGISTRY.lock().expect("mcp registry poisoned");
guard.servers.clear();
guard.active.clear();
}
/// Install a pre-connected handle against a server name so eager-start
/// flows can register "already running" servers without going through
/// the lazy-boot path. Used by the CLI for non-lazy servers.
pub fn install_active(name: &str, handle: VmMcpClientHandle) {
let mut guard = REGISTRY.lock().expect("mcp registry poisoned");
guard.active.insert(
name.to_string(),
ActiveConnection {
handle,
ref_count: usize::MAX / 2,
last_released_at: None,
},
);
}
/// Look up the live client handle by server name. Returns `None` when
/// the server is registered but not currently connected (use
/// `ensure_active` to force a lazy boot).
pub fn active_handle(name: &str) -> Option<VmMcpClientHandle> {
REGISTRY
.lock()
.expect("mcp registry poisoned")
.active
.get(name)
.map(|a| a.handle.clone())
}
/// Connect to a registered server if not already connected, and bump
/// its binder count. Returns the live client handle.
///
/// Fails with `VmError::Runtime` when:
/// - `name` isn't registered.
/// - The connection attempt itself fails.
pub async fn ensure_active(name: &str) -> Result<VmMcpClientHandle, VmError> {
// Fast path: connection exists. Bump ref count under the lock.
{
let mut guard = REGISTRY.lock().expect("mcp registry poisoned");
if let Some(active) = guard.active.get_mut(name) {
if active.ref_count != usize::MAX / 2 {
active.ref_count = active.ref_count.saturating_add(1);
}
active.last_released_at = None;
return Ok(active.handle.clone());
}
}
// Slow path: fetch spec, connect outside the lock (connect is
// async, can't await while holding Mutex), then install.
let spec = {
let guard = REGISTRY.lock().expect("mcp registry poisoned");
guard.servers.get(name).cloned()
};
let registration = spec.ok_or_else(|| {
VmError::Runtime(format!(
"mcp: no server named '{name}' is registered (check harn.toml)"
))
})?;
let handle = connect_mcp_server_from_json(®istration.spec).await?;
// Install under the lock. Handle race: another task may have
// connected the same server in the meantime — if so, keep the
// incumbent handle and silently drop ours (the new child process
// will exit when the handle is dropped).
let mut guard = REGISTRY.lock().expect("mcp registry poisoned");
match guard.active.get_mut(name) {
Some(existing) => {
if existing.ref_count != usize::MAX / 2 {
existing.ref_count = existing.ref_count.saturating_add(1);
}
existing.last_released_at = None;
Ok(existing.handle.clone())
}
None => {
guard.active.insert(
name.to_string(),
ActiveConnection {
handle: handle.clone(),
ref_count: 1,
last_released_at: None,
},
);
Ok(handle)
}
}
}
/// Decrement the binder count for `name`. When it reaches 0 (and the
/// registration was lazy), marks the timestamp so `sweep_expired` can
/// disconnect after the keep-alive window.
pub fn release(name: &str) {
let mut guard = REGISTRY.lock().expect("mcp registry poisoned");
let keep_alive = guard
.servers
.get(name)
.and_then(|s| s.keep_alive)
.unwrap_or(Duration::ZERO);
let to_drop = match guard.active.get_mut(name) {
Some(active) => {
// Non-lazy servers have ref_count pinned; release is no-op.
if active.ref_count == usize::MAX / 2 {
return;
}
if active.ref_count > 1 {
active.ref_count -= 1;
None
} else {
active.ref_count = 0;
active.last_released_at = Some(Instant::now());
if keep_alive.is_zero() {
Some(active.handle.clone())
} else {
None
}
}
}
None => None,
};
if to_drop.is_some() {
guard.active.remove(name);
}
}
/// Force-disconnect servers whose keep-alive has elapsed. Called
/// periodically by the agent loop's post-turn housekeeping — never
/// blocks on network, just drops the handle.
pub fn sweep_expired() {
let mut guard = REGISTRY.lock().expect("mcp registry poisoned");
let now = Instant::now();
let mut expired: Vec<String> = Vec::new();
for (name, active) in guard.active.iter() {
if active.ref_count != 0 {
continue;
}
let Some(last) = active.last_released_at else {
continue;
};
let ka = guard
.servers
.get(name)
.and_then(|s| s.keep_alive)
.unwrap_or(Duration::ZERO);
if now.duration_since(last) >= ka {
expired.push(name.clone());
}
}
for name in expired {
guard.active.remove(&name);
}
}
/// Diagnostic snapshot of the registry — used by the `mcp_server_info`
/// builtin's extended mode and by tests.
#[derive(Clone, Debug)]
pub struct RegistryStatus {
pub name: String,
pub lazy: bool,
pub active: bool,
pub ref_count: usize,
pub card: Option<String>,
}
pub fn snapshot_status() -> Vec<RegistryStatus> {
let guard = REGISTRY.lock().expect("mcp registry poisoned");
let mut out = Vec::new();
for (name, server) in guard.servers.iter() {
let active = guard.active.get(name);
out.push(RegistryStatus {
name: name.clone(),
lazy: server.lazy,
active: active.is_some(),
ref_count: active.map(|a| a.ref_count).unwrap_or(0),
card: server.card.clone(),
});
}
out
}
#[cfg(test)]
mod tests {
use super::*;
/// The registry is process-global, so unit tests must serialize to
/// avoid interfering with each other. `TEST_LOCK` wraps every test
/// body — cheaper than spinning up a new process per test.
static TEST_LOCK: Mutex<()> = Mutex::new(());
fn make_spec(name: &str) -> serde_json::Value {
serde_json::json!({
"name": name,
"transport": "stdio",
"command": "/bin/true",
"args": [],
})
}
#[test]
fn register_and_snapshot() {
let _g = TEST_LOCK.lock().unwrap();
reset();
register_servers(vec![RegisteredMcpServer {
name: "x".into(),
spec: make_spec("x"),
lazy: true,
card: Some("card.json".into()),
keep_alive: None,
}]);
let snap = snapshot_status();
assert_eq!(snap.len(), 1);
assert_eq!(snap[0].name, "x");
assert!(snap[0].lazy);
assert!(!snap[0].active);
assert_eq!(snap[0].card.as_deref(), Some("card.json"));
}
#[test]
fn release_on_unknown_is_noop() {
let _g = TEST_LOCK.lock().unwrap();
reset();
release("doesnt-exist");
}
#[test]
fn is_registered_reflects_state() {
let _g = TEST_LOCK.lock().unwrap();
reset();
assert!(!is_registered("a"));
register_servers(vec![RegisteredMcpServer {
name: "a".into(),
spec: make_spec("a"),
lazy: false,
card: None,
keep_alive: None,
}]);
assert!(is_registered("a"));
}
}