roboticus-agent 0.11.4

Agent core with ReAct loop, policy engine, injection defense, memory system, and skill loader
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
//! MCP connection manager — lifecycle, health checks, and CapabilityRegistry bridging.
//!
//! Coordinates raw MCP connections (`LiveMcpConnection`) with the
//! `CapabilityRegistry` (tool registration) and provides a cancellable
//! health-check loop driven by a `tokio::sync::watch` channel.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::{RwLock, watch};
use tracing::{debug, info, warn};

use roboticus_core::config::{McpServerConfig, McpServerSpec, McpTransport};

use super::bridge::bridge_tools;
use super::client::{LiveMcpConnection, McpClientError};
use crate::capability::{Capability, CapabilityRegistry};

// ── Status ────────────────────────────────────────────────────────────────────

/// Status of a single MCP server connection for dashboard/WebUI reporting.
#[derive(Debug, Clone, serde::Serialize)]
pub struct McpServerStatus {
    pub name: String,
    pub connected: bool,
    pub tool_count: usize,
    pub server_name: String,
    pub server_version: String,
}

// ── Internal storage ──────────────────────────────────────────────────────────

/// An entry for a managed MCP server connection.
struct ServerEntry {
    /// Shared connection handle — held by this manager and by `McpCapability` instances.
    connection: Arc<RwLock<LiveMcpConnection>>,
    /// Original config kept for reconnection.
    config: McpServerConfig,
}

// ── Manager ───────────────────────────────────────────────────────────────────

/// Manages MCP connections with lifecycle, health checks, and CapabilityRegistry bridging.
///
/// `McpConnectionManager` owns the shared `Arc<RwLock<LiveMcpConnection>>`
/// handles. The same `Arc` is handed to `McpCapability` instances so that
/// tool calls go to the correct underlying server without locking the manager
/// itself.
///
/// Cancellation is provided through a `tokio::sync::watch` channel; call
/// [`McpConnectionManager::cancel`] to signal the health-check loop to stop.
pub struct McpConnectionManager {
    servers: RwLock<HashMap<String, ServerEntry>>,
    /// Sender half of the cancellation channel.  `true` means "stop".
    cancel_tx: watch::Sender<bool>,
    /// Receiver half shared with `health_check_loop`.
    cancel_rx: watch::Receiver<bool>,
}

impl Default for McpConnectionManager {
    fn default() -> Self {
        Self::new()
    }
}

impl McpConnectionManager {
    /// Create a new manager with its own cancellation signal.
    pub fn new() -> Self {
        let (cancel_tx, cancel_rx) = watch::channel(false);
        Self {
            servers: RwLock::new(HashMap::new()),
            cancel_tx,
            cancel_rx,
        }
    }

    /// Returns `true` if [`cancel`](Self::cancel) has been called.
    pub fn is_cancelled(&self) -> bool {
        *self.cancel_rx.borrow()
    }

    /// Signal the health-check loop (and any other watchers) to stop.
    pub fn cancel(&self) {
        // Ignore the error — it just means all receivers have been dropped.
        let _ = self.cancel_tx.send(true);
    }

    /// Returns a new `watch::Receiver` that fires when the manager is cancelled.
    ///
    /// Useful for spawning the health-check loop on a separate task:
    /// ```ignore
    /// let mut cancel = manager.subscribe_cancel();
    /// tokio::spawn(async move {
    ///     manager.health_check_loop(registry, interval, cancel).await;
    /// });
    /// ```
    pub fn subscribe_cancel(&self) -> watch::Receiver<bool> {
        self.cancel_rx.clone()
    }

    // ── Connection lifecycle ──────────────────────────────────────────────────

    /// Connect to a server and register its tools with the `CapabilityRegistry`.
    ///
    /// Returns the number of tools that were registered.
    pub async fn connect_server(
        &self,
        config: &McpServerConfig,
        registry: &CapabilityRegistry,
    ) -> Result<usize, McpClientError> {
        let conn = LiveMcpConnection::connect(config).await?;
        self.register_connected_server(config, registry, conn).await
    }

    async fn register_connected_server(
        &self,
        config: &McpServerConfig,
        registry: &CapabilityRegistry,
        conn: LiveMcpConnection,
    ) -> Result<usize, McpClientError> {
        let tool_count = conn.tools().len();

        let transport = match &config.spec {
            McpServerSpec::Stdio { .. } => McpTransport::Stdio,
            McpServerSpec::Sse { .. } => McpTransport::Sse,
        };

        let conn_arc = Arc::new(RwLock::new(conn));

        {
            let conn_read = conn_arc.read().await;
            let caps = bridge_tools(
                &config.name,
                conn_read.tools(),
                transport,
                Arc::clone(&conn_arc),
            );
            let cap_arcs: Vec<Arc<dyn Capability>> =
                caps.into_iter().map(|c| Arc::new(c) as _).collect();

            if let Err(e) = registry.reload_mcp_server(&config.name, cap_arcs).await {
                warn!(
                    server = %config.name,
                    error = %e,
                    "failed to register MCP tools in CapabilityRegistry"
                );
            }
        }

        let mut servers = self.servers.write().await;
        // TOCTOU guard: if another caller already reconnected this server
        // between our connect() and this write lock acquisition, skip the
        // redundant insert. The existing connection wins.
        if let Some(existing) = servers.get(&config.name)
            && let Ok(existing_conn) = existing.connection.try_read()
            && existing_conn.is_alive()
        {
            debug!(
                server = %config.name,
                "MCP server already reconnected by another caller; dropping duplicate"
            );
            return Ok(tool_count);
        }
        servers.insert(
            config.name.clone(),
            ServerEntry {
                connection: conn_arc,
                config: config.clone(),
            },
        );

        info!(
            server = %config.name,
            tool_count,
            "MCP server connected and tools registered"
        );
        Ok(tool_count)
    }

    /// Disconnect a server and unregister all its tools from the registry.
    pub async fn disconnect_server(&self, name: &str, registry: &CapabilityRegistry) {
        let mut servers = self.servers.write().await;
        if servers.remove(name).is_some() {
            // Reload with an empty list atomically removes all tools for this server.
            if let Err(e) = registry.reload_mcp_server(name, vec![]).await {
                warn!(server = %name, error = %e, "error unregistering MCP tools on disconnect");
            }
            info!(server = %name, "MCP server disconnected");
        }
    }

    /// Connect to all *enabled* servers in `configs`, logging warnings for failures.
    pub async fn connect_all(&self, configs: &[McpServerConfig], registry: &CapabilityRegistry) {
        for cfg in configs {
            if !cfg.enabled {
                debug!(name = %cfg.name, "skipping disabled MCP server");
                continue;
            }
            if let Err(e) = self.connect_server(cfg, registry).await {
                warn!(name = %cfg.name, error = %e, "failed to connect MCP server at startup");
            }
        }
    }

    // ── Status / introspection ────────────────────────────────────────────────

    /// Status snapshot of all managed servers.
    pub async fn server_statuses(&self) -> Vec<McpServerStatus> {
        let servers = self.servers.read().await;
        let mut statuses = Vec::with_capacity(servers.len());
        for (name, entry) in servers.iter() {
            let conn = entry.connection.read().await;
            statuses.push(McpServerStatus {
                name: name.clone(),
                connected: conn.is_alive(),
                tool_count: conn.tools().len(),
                server_name: conn.server_name().to_string(),
                server_version: conn.server_version().to_string(),
            });
        }
        statuses
    }

    /// Number of servers whose transport channel is still alive.
    pub async fn connected_count(&self) -> usize {
        let servers = self.servers.read().await;
        let mut count = 0;
        for entry in servers.values() {
            if entry.connection.read().await.is_alive() {
                count += 1;
            }
        }
        count
    }

    /// Total number of registered servers (alive or not).
    pub async fn total_count(&self) -> usize {
        self.servers.read().await.len()
    }

    /// Get the shared connection arc for a server, for direct tool dispatch.
    pub async fn get_connection(&self, name: &str) -> Option<Arc<RwLock<LiveMcpConnection>>> {
        self.servers
            .read()
            .await
            .get(name)
            .map(|e| Arc::clone(&e.connection))
    }

    // ── Health-check loop ─────────────────────────────────────────────────────

    /// Periodically pings all connections and reconnects any that have dropped.
    ///
    /// This method runs until [`cancel`](Self::cancel) is called (or the
    /// provided `cancel_rx` fires). Intended to be spawned as a background task:
    ///
    /// ```ignore
    /// let cancel = manager.subscribe_cancel();
    /// tokio::spawn(async move {
    ///     manager.health_check_loop(registry, Duration::from_secs(30), cancel).await;
    /// });
    /// ```
    pub async fn health_check_loop(
        &self,
        registry: &CapabilityRegistry,
        interval: Duration,
        mut cancel_rx: watch::Receiver<bool>,
    ) {
        loop {
            tokio::select! {
                _ = tokio::time::sleep(interval) => {}
                _ = cancel_rx.changed() => {
                    if *cancel_rx.borrow() {
                        debug!("MCP health-check loop cancelled");
                        return;
                    }
                }
            }

            // Collect names of dead connections while holding a read lock.
            let dead: Vec<McpServerConfig> = {
                let servers = self.servers.read().await;
                servers
                    .values()
                    .filter_map(|entry| {
                        // We need a blocking check here; `is_alive` is sync.
                        // We cannot `.await` inside a closure, so we do a
                        // try_read and fall back to assuming alive on contention.
                        if let Ok(conn) = entry.connection.try_read()
                            && !conn.is_alive()
                        {
                            return Some(entry.config.clone());
                        }
                        None
                    })
                    .collect()
            };

            for cfg in dead {
                warn!(server = %cfg.name, "MCP server connection lost — attempting reconnect");
                match self.connect_server(&cfg, registry).await {
                    Ok(tool_count) => {
                        info!(
                            server = %cfg.name,
                            tool_count,
                            "MCP server reconnected — tools re-registered"
                        );
                    }
                    Err(e) => {
                        warn!(server = %cfg.name, error = %e, "MCP reconnect failed");
                    }
                }
            }
        }
    }
}

// ── Tests ─────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;
    use crate::mcp::client::test_support;
    use std::time::Duration;

    fn test_sse_config(name: &str, enabled: bool) -> McpServerConfig {
        McpServerConfig {
            name: name.into(),
            spec: McpServerSpec::Sse {
                url: "http://in-memory-test.invalid/mcp".into(),
            },
            enabled,
            auth_token_env: None,
            tool_allowlist: Vec::new(),
        }
    }

    #[test]
    fn manager_new_is_empty() {
        let rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(async {
            let mgr = McpConnectionManager::new();
            assert_eq!(mgr.total_count().await, 0);
            assert_eq!(mgr.connected_count().await, 0);
            assert!(mgr.server_statuses().await.is_empty());
        });
    }

    #[test]
    fn manager_cancellation_works() {
        let mgr = McpConnectionManager::new();
        assert!(!mgr.is_cancelled());
        mgr.cancel();
        assert!(mgr.is_cancelled());
    }

    #[test]
    fn server_status_serializes() {
        let status = McpServerStatus {
            name: "github".into(),
            connected: true,
            tool_count: 5,
            server_name: "github-mcp".into(),
            server_version: "1.0.0".into(),
        };
        let json = serde_json::to_string(&status).unwrap();
        assert!(json.contains("\"name\":\"github\""));
        assert!(json.contains("\"connected\":true"));
        assert!(json.contains("\"tool_count\":5"));
        assert!(json.contains("\"server_name\":\"github-mcp\""));
        assert!(json.contains("\"server_version\":\"1.0.0\""));
    }

    #[test]
    fn manager_default_matches_new() {
        let rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(async {
            let mgr = McpConnectionManager::default();
            assert_eq!(mgr.total_count().await, 0);
            assert!(!mgr.is_cancelled());
        });
    }

    #[test]
    fn subscribe_cancel_receiver_fires() {
        let mgr = McpConnectionManager::new();
        let rx = mgr.subscribe_cancel();
        assert!(!*rx.borrow());
        mgr.cancel();
        // After cancel(), the watch value is true.
        assert!(*rx.borrow());
        // changed() should not block — it already has a new value.
        // We use try_changed() to verify without async overhead.
        assert!(rx.has_changed().unwrap());
    }

    #[tokio::test]
    async fn connect_server_registers_registry_and_status() {
        let registry = CapabilityRegistry::new();
        let mgr = McpConnectionManager::new();
        let config = test_sse_config("remote-test", true);
        let (conn, server_handle) = test_support::echo_connection(&config.name).await.unwrap();

        let tool_count = mgr
            .register_connected_server(&config, &registry, conn)
            .await
            .unwrap();
        assert_eq!(tool_count, 1);
        assert_eq!(mgr.total_count().await, 1);
        assert_eq!(mgr.connected_count().await, 1);
        assert!(mgr.get_connection("remote-test").await.is_some());
        assert!(registry.get("remote-test::echo").await.is_some());

        let statuses = mgr.server_statuses().await;
        assert_eq!(statuses.len(), 1);
        assert_eq!(statuses[0].name, "remote-test");
        assert!(statuses[0].connected);
        assert_eq!(statuses[0].tool_count, 1);

        server_handle.abort();
        let _ = server_handle.await;
    }

    #[tokio::test]
    async fn disconnect_server_unregisters_registry_capabilities() {
        let registry = CapabilityRegistry::new();
        let mgr = McpConnectionManager::new();
        let config = test_sse_config("remote-test", true);
        let (conn, server_handle) = test_support::echo_connection(&config.name).await.unwrap();
        mgr.register_connected_server(&config, &registry, conn)
            .await
            .unwrap();

        mgr.disconnect_server("remote-test", &registry).await;
        assert_eq!(mgr.total_count().await, 0);
        assert!(mgr.get_connection("remote-test").await.is_none());
        assert!(registry.get("remote-test::echo").await.is_none());

        server_handle.abort();
        let _ = server_handle.await;
    }

    #[tokio::test]
    async fn connect_all_skips_disabled_servers() {
        let registry = CapabilityRegistry::new();
        let mgr = McpConnectionManager::new();
        let disabled_cfg = test_sse_config("disabled-test", false);
        mgr.connect_all(std::slice::from_ref(&disabled_cfg), &registry)
            .await;

        assert_eq!(mgr.total_count().await, 0);
        assert!(mgr.get_connection("disabled-test").await.is_none());
        assert!(registry.get("disabled-test::echo").await.is_none());
        assert!(!disabled_cfg.enabled);
    }

    #[tokio::test]
    async fn register_connected_server_supports_connect_all_style_registry_state() {
        let registry = CapabilityRegistry::new();
        let mgr = McpConnectionManager::new();
        let enabled_cfg = test_sse_config("enabled-test", true);
        let (enabled_conn, enabled_handle) = test_support::echo_connection(&enabled_cfg.name)
            .await
            .unwrap();

        mgr.register_connected_server(&enabled_cfg, &registry, enabled_conn)
            .await
            .unwrap();

        assert_eq!(mgr.total_count().await, 1);
        assert!(mgr.get_connection("enabled-test").await.is_some());
        assert!(mgr.get_connection("disabled-test").await.is_none());
        assert!(registry.get("enabled-test::echo").await.is_some());

        enabled_handle.abort();
        let _ = enabled_handle.await;
    }

    #[tokio::test]
    async fn health_check_loop_exits_when_cancelled() {
        let registry = CapabilityRegistry::new();
        let mgr = McpConnectionManager::new();
        let cancel = mgr.subscribe_cancel();
        mgr.cancel();

        tokio::time::timeout(
            Duration::from_secs(1),
            mgr.health_check_loop(&registry, Duration::from_millis(10), cancel),
        )
        .await
        .expect("health loop should exit promptly after cancellation");
    }
}