1pub mod handler;
34pub(crate) mod http;
35pub mod lua_json;
36
37use std::collections::HashMap;
38use std::path::Path;
39use std::process::Stdio;
40use std::sync::Arc;
41use std::time::{Duration, Instant};
42
43use mlua_isle::AsyncIsle;
44use rmcp::{
45 model::{
46 ArgumentInfo, CallToolRequestParams, CancelledNotification, CancelledNotificationParam,
47 ClientRequest, CompleteRequestParams, GetPromptRequestParams, NumberOrString, PingRequest,
48 ReadResourceRequestParams, Reference, RootsListChangedNotification, ServerResult,
49 SubscribeRequestParams, UnsubscribeRequestParams,
50 },
51 service::{RoleClient, RunningService},
52 transport::TokioChildProcess,
53 ServiceExt,
54};
55use tokio::process::Command;
56use tokio::time::timeout;
57use tracing::warn;
58
59use agent_block_types::error::{BlockError, BlockResult};
60
61pub use handler::AgentBlockClientHandler;
62
63pub const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(30);
65
66pub struct McpManager {
67 pub servers: HashMap<String, RunningService<RoleClient, AgentBlockClientHandler>>,
71 rpc_timeout: Duration,
72 pub handler: AgentBlockClientHandler,
74}
75
76impl McpManager {
77 pub fn new() -> Self {
78 Self {
79 servers: HashMap::new(),
80 rpc_timeout: DEFAULT_RPC_TIMEOUT,
81 handler: AgentBlockClientHandler::new(),
82 }
83 }
84
85 pub fn with_rpc_timeout(rpc_timeout: Duration) -> BlockResult<Self> {
95 if rpc_timeout.is_zero() {
96 return Err(BlockError::Mcp(
97 "rpc_timeout must be > 0 (got Duration::ZERO); \
98 every MCP RPC would time out immediately"
99 .to_string(),
100 ));
101 }
102 Ok(Self {
103 servers: HashMap::new(),
104 rpc_timeout,
105 handler: AgentBlockClientHandler::new(),
106 })
107 }
108
109 pub async fn connect(
123 &mut self,
124 name: &str,
125 command: &str,
126 args: &[String],
127 trace_context: bool,
128 cwd: Option<&Path>,
129 ) -> BlockResult<()> {
130 let mut cmd = Command::new(command);
131 cmd.args(args).stderr(Stdio::inherit());
132 if let Some(dir) = cwd {
133 cmd.current_dir(dir);
134 }
135 let transport = TokioChildProcess::new(cmd).map_err(|e| {
136 warn!(server = %name, command = %command, error = %e, "mcp spawn failed");
137 BlockError::Mcp(format!("spawn '{command}': {e}"))
138 })?;
139 let rpc_timeout = self.rpc_timeout;
140 self.handler.ensure_server(name);
143 self.handler.set_trace_context(name, trace_context);
144 self.handler.server_name = Some(name.to_string());
153 let handler = self.handler.clone();
154 self.handler.server_name = None;
157 let running = timeout(rpc_timeout, handler.serve(transport))
158 .await
159 .map_err(|_| {
160 warn!(server = %name, timeout = ?rpc_timeout, "mcp initialize timed out");
161 BlockError::Timeout(format!(
162 "initialize '{name}' timed out after {rpc_timeout:?}"
163 ))
164 })?
165 .map_err(|e| {
166 warn!(server = %name, error = %e, "mcp initialize failed");
167 BlockError::Mcp(format!("initialize '{name}': {e}"))
168 })?;
169 self.servers.insert(name.to_string(), running);
170 Ok(())
171 }
172
173 pub async fn list_tools(&self, name: &str) -> BlockResult<serde_json::Value> {
177 let srv = self.servers.get(name).ok_or_else(|| {
178 warn!(server = %name, "mcp list_tools on unknown server");
179 BlockError::Mcp(format!("no server named '{name}'"))
180 })?;
181 let rpc_timeout = self.rpc_timeout;
182 let tools = timeout(rpc_timeout, srv.list_all_tools())
183 .await
184 .map_err(|_| {
185 warn!(server = %name, timeout = ?rpc_timeout, "mcp list_tools timed out");
186 BlockError::Timeout(format!(
187 "list_tools '{name}' timed out after {rpc_timeout:?}"
188 ))
189 })?
190 .map_err(|e| {
191 warn!(server = %name, error = %e, "mcp list_tools failed");
192 BlockError::Mcp(format!("list_tools '{name}': {e}"))
193 })?;
194 serde_json::to_value(&tools)
195 .map_err(|e| BlockError::Mcp(format!("serialize list_tools result: {e}")))
196 }
197
198 pub async fn call_tool(
212 &self,
213 name: &str,
214 tool_name: &str,
215 arguments: serde_json::Value,
216 ) -> BlockResult<serde_json::Value> {
217 let mut params = CallToolRequestParams::new(tool_name.to_string());
223 match arguments {
224 serde_json::Value::Object(obj) => {
225 params = params.with_arguments(obj);
226 }
227 serde_json::Value::Null => {}
228 other => {
229 let kind = match other {
230 serde_json::Value::Array(_) => "array",
231 serde_json::Value::String(_) => "string",
232 serde_json::Value::Number(_) => "number",
233 serde_json::Value::Bool(_) => "bool",
234 _ => "unknown",
235 };
236 return Err(BlockError::Mcp(format!(
237 "call_tool '{tool_name}' on '{name}': arguments must be a JSON object \
238 (got {kind})"
239 )));
240 }
241 }
242 let srv = self.servers.get(name).ok_or_else(|| {
243 warn!(server = %name, tool = %tool_name, "mcp call_tool on unknown server");
244 BlockError::Mcp(format!("no server named '{name}'"))
245 })?;
246 let rpc_timeout = self.rpc_timeout;
247 let result = timeout(rpc_timeout, srv.call_tool(params))
248 .await
249 .map_err(|_| {
250 warn!(server = %name, tool = %tool_name, timeout = ?rpc_timeout, "mcp call_tool timed out");
251 self.send_cancelled(name, None);
258 BlockError::Timeout(format!(
259 "call_tool '{tool_name}' on '{name}' timed out after {rpc_timeout:?}"
260 ))
261 })?
262 .map_err(|e| {
263 warn!(server = %name, tool = %tool_name, error = %e, "mcp call_tool failed");
264 BlockError::Mcp(format!("call_tool '{tool_name}' on '{name}': {e}"))
265 })?;
266 serde_json::to_value(&result)
267 .map_err(|e| BlockError::Mcp(format!("serialize call_tool result: {e}")))
268 }
269
270 pub async fn disconnect(&mut self, name: &str) -> BlockResult<()> {
285 let Some(running) = self.servers.remove(name) else {
286 return Ok(());
287 };
288 let cancel_timeout = self.rpc_timeout;
289 match timeout(cancel_timeout, running.cancel()).await {
290 Ok(Ok(_)) => Ok(()),
291 Ok(Err(e)) => {
292 warn!(server = %name, error = %e, "mcp cancel failed");
293 Err(BlockError::Mcp(format!("cancel '{name}': {e}")))
294 }
295 Err(_) => {
296 warn!(server = %name, timeout = ?cancel_timeout, "mcp cancel timed out");
297 Err(BlockError::Timeout(format!(
298 "cancel '{name}' timed out after {cancel_timeout:?}"
299 )))
300 }
301 }
302 }
303
304 pub async fn disconnect_all(&mut self) -> BlockResult<()> {
311 let mut first_err: Option<BlockError> = None;
312 let names: Vec<String> = self.servers.keys().cloned().collect();
313 for name in names {
314 if let Err(e) = self.disconnect(&name).await {
315 if first_err.is_none() {
316 first_err = Some(e);
317 } else {
318 warn!(server = %name, error = %e, "disconnect failed during disconnect_all");
319 }
320 }
321 }
322 match first_err {
323 Some(e) => Err(e),
324 None => Ok(()),
325 }
326 }
327
328 pub fn set_handler_isle(&mut self, isle: Arc<AsyncIsle>) {
336 self.handler.handler_isle = Some(isle);
337 }
338
339 pub fn set_main_isle(&mut self, isle: Arc<AsyncIsle>) {
351 self.handler.main_isle = Some(isle);
352 self.handler.start_dispatch_task();
353 }
354
355 pub async fn connect_http(
365 &mut self,
366 name: &str,
367 url: &str,
368 opts: serde_json::Value,
369 ) -> BlockResult<()> {
370 let trace_context = opts
371 .get("trace_context")
372 .and_then(|v| v.as_bool())
373 .unwrap_or(false);
374 self.handler.ensure_server(name);
375 self.handler.set_trace_context(name, trace_context);
376 self.handler.server_name = Some(name.to_string());
379 let handler = self.handler.clone();
380 self.handler.server_name = None;
381 let running =
382 http::connect_http_transport(name, url, &opts, handler, self.rpc_timeout).await?;
383 self.servers.insert(name.to_string(), running);
384 Ok(())
385 }
386
387 pub async fn list_resources(&self, name: &str) -> BlockResult<serde_json::Value> {
391 let srv = self.servers.get(name).ok_or_else(|| {
392 warn!(server = %name, "mcp list_resources on unknown server");
393 BlockError::Mcp(format!("no server named '{name}'"))
394 })?;
395 let rpc_timeout = self.rpc_timeout;
396 let resources = timeout(rpc_timeout, srv.list_all_resources())
397 .await
398 .map_err(|_| {
399 warn!(server = %name, timeout = ?rpc_timeout, "mcp list_resources timed out");
400 BlockError::Timeout(format!(
401 "list_resources '{name}' timed out after {rpc_timeout:?}"
402 ))
403 })?
404 .map_err(|e| {
405 warn!(server = %name, error = %e, "mcp list_resources failed");
406 BlockError::Mcp(format!("list_resources '{name}': {e}"))
407 })?;
408 serde_json::to_value(&resources)
409 .map_err(|e| BlockError::Mcp(format!("serialize list_resources result: {e}")))
410 }
411
412 pub async fn list_resource_templates(&self, name: &str) -> BlockResult<serde_json::Value> {
416 let srv = self.servers.get(name).ok_or_else(|| {
417 warn!(server = %name, "mcp list_resource_templates on unknown server");
418 BlockError::Mcp(format!("no server named '{name}'"))
419 })?;
420 let rpc_timeout = self.rpc_timeout;
421 let templates = timeout(rpc_timeout, srv.list_all_resource_templates())
422 .await
423 .map_err(|_| {
424 warn!(server = %name, timeout = ?rpc_timeout, "mcp list_resource_templates timed out");
425 BlockError::Timeout(format!(
426 "list_resource_templates '{name}' timed out after {rpc_timeout:?}"
427 ))
428 })?
429 .map_err(|e| {
430 warn!(server = %name, error = %e, "mcp list_resource_templates failed");
431 BlockError::Mcp(format!("list_resource_templates '{name}': {e}"))
432 })?;
433 serde_json::to_value(&templates)
434 .map_err(|e| BlockError::Mcp(format!("serialize list_resource_templates result: {e}")))
435 }
436
437 pub async fn ping(&self, name: &str) -> BlockResult<u64> {
447 let srv = self.servers.get(name).ok_or_else(|| {
448 warn!(server = %name, "mcp ping on unknown server");
449 BlockError::Mcp(format!("no server named '{name}'"))
450 })?;
451 let rpc_timeout = self.rpc_timeout;
452 let peer = srv.peer().clone();
455 let ping_req = ClientRequest::PingRequest(PingRequest::default());
456 let started = Instant::now();
459 let response = timeout(rpc_timeout, peer.send_request(ping_req))
460 .await
461 .map_err(|_| {
462 warn!(server = %name, timeout = ?rpc_timeout, "mcp ping timed out");
463 BlockError::Timeout(format!("ping '{name}' timed out after {rpc_timeout:?}"))
464 })?
465 .map_err(|e| {
466 warn!(server = %name, error = %e, "mcp ping failed");
467 BlockError::Mcp(format!("ping '{name}': {e}"))
468 })?;
469 match response {
470 ServerResult::EmptyResult(_) => {
471 let latency_ms = started.elapsed().as_millis() as u64;
472 Ok(latency_ms)
473 }
474 other => {
475 warn!(server = %name, "mcp ping: unexpected response");
476 Err(BlockError::Mcp(format!(
477 "ping '{name}': unexpected response: {other:?}"
478 )))
479 }
480 }
481 }
482
483 pub async fn read_resource(&self, name: &str, uri: &str) -> BlockResult<serde_json::Value> {
487 let srv = self.servers.get(name).ok_or_else(|| {
488 warn!(server = %name, uri = %uri, "mcp read_resource on unknown server");
489 BlockError::Mcp(format!("no server named '{name}'"))
490 })?;
491 let rpc_timeout = self.rpc_timeout;
492 let params = ReadResourceRequestParams::new(uri);
493 let result = timeout(rpc_timeout, srv.read_resource(params))
494 .await
495 .map_err(|_| {
496 warn!(server = %name, uri = %uri, timeout = ?rpc_timeout, "mcp read_resource timed out");
497 BlockError::Timeout(format!(
498 "read_resource '{uri}' on '{name}' timed out after {rpc_timeout:?}"
499 ))
500 })?
501 .map_err(|e| {
502 warn!(server = %name, uri = %uri, error = %e, "mcp read_resource failed");
503 BlockError::Mcp(format!("read_resource '{uri}' on '{name}': {e}"))
504 })?;
505 serde_json::to_value(&result)
506 .map_err(|e| BlockError::Mcp(format!("serialize read_resource result: {e}")))
507 }
508
509 pub async fn subscribe_resource(&self, name: &str, uri: &str) -> BlockResult<()> {
513 let srv = self.servers.get(name).ok_or_else(|| {
514 warn!(server = %name, uri = %uri, "mcp subscribe_resource on unknown server");
515 BlockError::Mcp(format!("no server named '{name}'"))
516 })?;
517 let rpc_timeout = self.rpc_timeout;
518 let params = SubscribeRequestParams::new(uri);
519 timeout(rpc_timeout, srv.subscribe(params))
520 .await
521 .map_err(|_| {
522 warn!(server = %name, uri = %uri, timeout = ?rpc_timeout, "mcp subscribe_resource timed out");
523 BlockError::Timeout(format!(
524 "subscribe_resource '{uri}' on '{name}' timed out after {rpc_timeout:?}"
525 ))
526 })?
527 .map_err(|e| {
528 warn!(server = %name, uri = %uri, error = %e, "mcp subscribe_resource failed");
529 BlockError::Mcp(format!("subscribe_resource '{uri}' on '{name}': {e}"))
530 })
531 }
532
533 pub async fn unsubscribe_resource(&self, name: &str, uri: &str) -> BlockResult<()> {
537 let srv = self.servers.get(name).ok_or_else(|| {
538 warn!(server = %name, uri = %uri, "mcp unsubscribe_resource on unknown server");
539 BlockError::Mcp(format!("no server named '{name}'"))
540 })?;
541 let rpc_timeout = self.rpc_timeout;
542 let params = UnsubscribeRequestParams::new(uri);
543 timeout(rpc_timeout, srv.unsubscribe(params))
544 .await
545 .map_err(|_| {
546 warn!(server = %name, uri = %uri, timeout = ?rpc_timeout, "mcp unsubscribe_resource timed out");
547 BlockError::Timeout(format!(
548 "unsubscribe_resource '{uri}' on '{name}' timed out after {rpc_timeout:?}"
549 ))
550 })?
551 .map_err(|e| {
552 warn!(server = %name, uri = %uri, error = %e, "mcp unsubscribe_resource failed");
553 BlockError::Mcp(format!("unsubscribe_resource '{uri}' on '{name}': {e}"))
554 })
555 }
556
557 pub async fn list_prompts(&self, name: &str) -> BlockResult<serde_json::Value> {
561 let srv = self.servers.get(name).ok_or_else(|| {
562 warn!(server = %name, "mcp list_prompts on unknown server");
563 BlockError::Mcp(format!("no server named '{name}'"))
564 })?;
565 let rpc_timeout = self.rpc_timeout;
566 let prompts = timeout(rpc_timeout, srv.list_all_prompts())
567 .await
568 .map_err(|_| {
569 warn!(server = %name, timeout = ?rpc_timeout, "mcp list_prompts timed out");
570 BlockError::Timeout(format!(
571 "list_prompts '{name}' timed out after {rpc_timeout:?}"
572 ))
573 })?
574 .map_err(|e| {
575 warn!(server = %name, error = %e, "mcp list_prompts failed");
576 BlockError::Mcp(format!("list_prompts '{name}': {e}"))
577 })?;
578 serde_json::to_value(&prompts)
579 .map_err(|e| BlockError::Mcp(format!("serialize list_prompts result: {e}")))
580 }
581
582 pub async fn get_prompt(
586 &self,
587 name: &str,
588 prompt_name: &str,
589 args: serde_json::Value,
590 ) -> BlockResult<serde_json::Value> {
591 let mut params = GetPromptRequestParams::new(prompt_name.to_string());
592 match args {
593 serde_json::Value::Object(obj) => {
594 params = params.with_arguments(obj);
595 }
596 serde_json::Value::Null => {}
597 other => {
598 let kind = match other {
599 serde_json::Value::Array(_) => "array",
600 serde_json::Value::String(_) => "string",
601 serde_json::Value::Number(_) => "number",
602 serde_json::Value::Bool(_) => "bool",
603 _ => "unknown",
604 };
605 return Err(BlockError::Mcp(format!(
606 "get_prompt '{prompt_name}' on '{name}': args must be a JSON object \
607 (got {kind})"
608 )));
609 }
610 }
611 let srv = self.servers.get(name).ok_or_else(|| {
612 warn!(server = %name, prompt = %prompt_name, "mcp get_prompt on unknown server");
613 BlockError::Mcp(format!("no server named '{name}'"))
614 })?;
615 let rpc_timeout = self.rpc_timeout;
616 let result = timeout(rpc_timeout, srv.get_prompt(params))
617 .await
618 .map_err(|_| {
619 warn!(server = %name, prompt = %prompt_name, timeout = ?rpc_timeout, "mcp get_prompt timed out");
620 BlockError::Timeout(format!(
621 "get_prompt '{prompt_name}' on '{name}' timed out after {rpc_timeout:?}"
622 ))
623 })?
624 .map_err(|e| {
625 warn!(server = %name, prompt = %prompt_name, error = %e, "mcp get_prompt failed");
626 BlockError::Mcp(format!("get_prompt '{prompt_name}' on '{name}': {e}"))
627 })?;
628 serde_json::to_value(&result)
629 .map_err(|e| BlockError::Mcp(format!("serialize get_prompt result: {e}")))
630 }
631
632 pub async fn complete(
641 &self,
642 name: &str,
643 ref_json: serde_json::Value,
644 arg_name: &str,
645 arg_value: &str,
646 ) -> BlockResult<serde_json::Value> {
647 let reference = match ref_json.get("type").and_then(|v| v.as_str()) {
651 Some("ref/prompt") => {
652 let prompt_name = ref_json.get("name").and_then(|v| v.as_str()).unwrap_or("");
653 Reference::for_prompt(prompt_name)
654 }
655 Some("ref/resource") => {
656 let uri = ref_json.get("uri").and_then(|v| v.as_str()).unwrap_or("");
657 Reference::for_resource(uri)
658 }
659 Some(kind) => {
660 warn!(server = %name, kind = ?kind, "mcp complete: invalid ref kind");
661 return Err(BlockError::Mcp(format!(
662 "complete on '{name}': invalid ref kind '{kind}', \
663 expected 'ref/prompt' or 'ref/resource'"
664 )));
665 }
666 None => {
667 warn!(server = %name, "mcp complete: ref missing 'type' field");
668 return Err(BlockError::Mcp(format!(
669 "complete on '{name}': ref object has no 'type' field"
670 )));
671 }
672 };
673 let params = CompleteRequestParams::new(
674 reference,
675 ArgumentInfo {
676 name: arg_name.to_string(),
677 value: arg_value.to_string(),
678 },
679 );
680 let srv = self.servers.get(name).ok_or_else(|| {
681 warn!(server = %name, "mcp complete on unknown server");
682 BlockError::Mcp(format!("no server named '{name}'"))
683 })?;
684 let rpc_timeout = self.rpc_timeout;
685 let result = timeout(rpc_timeout, srv.complete(params))
686 .await
687 .map_err(|_| {
688 warn!(server = %name, timeout = ?rpc_timeout, "mcp complete timed out");
689 BlockError::Timeout(format!(
690 "complete on '{name}' timed out after {rpc_timeout:?}"
691 ))
692 })?
693 .map_err(|e| {
694 warn!(server = %name, error = %e, "mcp complete failed");
695 BlockError::Mcp(format!("complete on '{name}': {e}"))
696 })?;
697 serde_json::to_value(&result)
698 .map_err(|e| BlockError::Mcp(format!("serialize complete result: {e}")))
699 }
700
701 pub fn server_info(&self, name: &str) -> BlockResult<serde_json::Value> {
708 let srv = self.servers.get(name).ok_or_else(|| {
709 warn!(server = %name, "mcp server_info on unknown server");
710 BlockError::Mcp(format!("no server named '{name}'"))
711 })?;
712 let info = srv.peer_info().ok_or_else(|| {
713 warn!(server = %name, "mcp server_info: server not yet initialized");
714 BlockError::Mcp(format!("server '{name}' not yet initialized"))
715 })?;
716 serde_json::to_value(info)
717 .map_err(|e| BlockError::Mcp(format!("serialize server_info '{name}': {e}")))
718 }
719
720 pub fn send_cancelled(&self, name: &str, request_id: Option<i64>) {
734 let id = match request_id {
737 Some(id) => id,
738 None => return,
739 };
740 let Some(srv) = self.servers.get(name) else {
741 warn!(server = %name, "send_cancelled: unknown server, ignoring");
742 return;
743 };
744 let peer = srv.peer().clone();
747 let name_owned = name.to_string();
748 tokio::spawn(async move {
749 let notification = CancelledNotification::new(CancelledNotificationParam {
752 request_id: NumberOrString::Number(id),
753 reason: Some("cancelled".to_owned()),
754 });
755 if let Err(e) = peer.send_notification(notification.into()).await {
756 warn!(
757 server = %name_owned,
758 request_id = %id,
759 error = %e,
760 "send_cancelled: peer send_notification failed"
761 );
762 }
763 });
764 }
765
766 pub fn notify_roots_list_changed(&self, name: &str) {
780 let Some(srv) = self.servers.get(name) else {
781 warn!(server = %name, "notify_roots_list_changed: unknown server, ignoring");
782 return;
783 };
784 let peer = srv.peer().clone();
787 let name_owned = name.to_string();
788 tokio::spawn(async move {
789 let notification = RootsListChangedNotification::default();
793 if let Err(e) = peer.send_notification(notification.into()).await {
794 warn!(
795 server = %name_owned,
796 error = %e,
797 "notify_roots_list_changed: peer send_notification failed"
798 );
799 }
800 });
801 }
802}
803
804impl Default for McpManager {
805 fn default() -> Self {
806 Self::new()
807 }
808}
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813
814 #[tokio::test]
815 async fn new_manager_is_empty() {
816 let mgr = McpManager::new();
817 assert!(mgr.servers.is_empty());
818 }
819
820 #[tokio::test]
821 async fn with_rpc_timeout_rejects_zero() {
822 let err = match McpManager::with_rpc_timeout(Duration::ZERO) {
828 Ok(_) => panic!("Duration::ZERO must be rejected"),
829 Err(e) => e,
830 };
831 assert!(
832 err.to_string().contains("rpc_timeout must be > 0"),
833 "unexpected error: {err}",
834 );
835 }
836
837 #[tokio::test]
838 async fn with_rpc_timeout_accepts_positive() {
839 let mgr = match McpManager::with_rpc_timeout(Duration::from_millis(1)) {
840 Ok(m) => m,
841 Err(e) => panic!("positive timeout must be accepted: {e}"),
842 };
843 assert!(mgr.servers.is_empty());
844 }
845
846 #[tokio::test]
847 async fn disconnect_nonexistent_is_ok() {
848 let mut mgr = McpManager::new();
849 assert!(mgr.disconnect("ghost").await.is_ok());
850 }
851
852 #[tokio::test]
853 async fn call_unknown_server_returns_error() {
854 let mgr = McpManager::new();
858 let res = mgr.call_tool("none", "dummy", serde_json::json!({})).await;
859 assert!(res.is_err());
860 }
861
862 #[tokio::test]
863 async fn list_tools_takes_shared_receiver() {
864 let mgr = McpManager::new();
866 let res = mgr.list_tools("none").await;
867 assert!(res.is_err());
868 }
869
870 #[tokio::test]
871 async fn disconnect_all_empties_map() {
872 let mut mgr = McpManager::new();
873 mgr.disconnect_all()
874 .await
875 .expect("disconnect_all on empty manager should succeed");
876 assert!(mgr.servers.is_empty());
877 }
878
879 #[tokio::test]
880 async fn call_tool_rejects_non_object_arguments() {
881 let mgr = McpManager::new();
884 for bad in [
885 serde_json::json!([1, 2, 3]),
886 serde_json::json!("string"),
887 serde_json::json!(42),
888 serde_json::json!(true),
889 ] {
890 let res = mgr.call_tool("anything", "dummy", bad.clone()).await;
891 let err = res.expect_err("non-object args must error");
892 let msg = err.to_string();
893 assert!(
894 msg.contains("arguments must be a JSON object"),
895 "unexpected error for {bad}: {msg}",
896 );
897 }
898 }
899
900 #[tokio::test]
901 async fn call_tool_accepts_null_arguments_as_absent() {
902 let mgr = McpManager::new();
905 let res = mgr
906 .call_tool("ghost", "dummy", serde_json::Value::Null)
907 .await;
908 let err = res.expect_err("expected no-server error, not arg-shape error");
909 assert!(
910 err.to_string().contains("no server named"),
911 "Null args should reach the lookup step: {err}",
912 );
913 }
914}
915
916#[cfg(test)]
930mod concurrency_tests {
931 use super::*;
932 use std::sync::Arc;
933 use std::time::Instant;
934 use tokio::sync::RwLock;
935
936 use rmcp::{
937 model::{CallToolRequestParams, CallToolResult, Content, ServerCapabilities, ServerInfo},
938 service::{MaybeSendFuture, RequestContext},
939 ErrorData as McpError, RoleServer, ServerHandler, ServiceExt,
940 };
941
942 #[derive(Clone)]
946 struct SlowToolServer {
947 delay: Duration,
948 }
949
950 impl ServerHandler for SlowToolServer {
951 fn get_info(&self) -> ServerInfo {
952 ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
953 }
954
955 fn call_tool(
956 &self,
957 _params: CallToolRequestParams,
958 _ctx: RequestContext<RoleServer>,
959 ) -> impl std::future::Future<Output = Result<CallToolResult, McpError>> + MaybeSendFuture + '_
960 {
961 let delay = self.delay;
962 async move {
963 tokio::time::sleep(delay).await;
964 Ok(CallToolResult::success(vec![Content::text("ok")]))
965 }
966 }
967 }
968
969 async fn attach_slow_server(mgr: &mut McpManager, name: &str, delay: Duration) {
973 let (server_side, client_side) = tokio::io::duplex(8192);
974
975 let server = SlowToolServer { delay };
976 tokio::spawn(async move {
977 if let Ok(running) = server.serve(server_side).await {
978 let _ = running.waiting().await;
979 }
980 });
981
982 let handler = AgentBlockClientHandler::new();
983 let running = handler
984 .serve(client_side)
985 .await
986 .expect("client handshake should succeed over duplex");
987 mgr.servers.insert(name.to_string(), running);
988 }
989
990 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
991 async fn concurrent_call_tool_same_server_does_not_serialize() {
992 let delay = Duration::from_millis(300);
993 let mgr = Arc::new(RwLock::new(McpManager::new()));
994
995 attach_slow_server(&mut *mgr.write().await, "slow", delay).await;
996
997 let start = Instant::now();
998 let a = {
999 let mgr = Arc::clone(&mgr);
1000 async move {
1001 mgr.read()
1002 .await
1003 .call_tool("slow", "slow_tool", serde_json::json!({}))
1004 .await
1005 }
1006 };
1007 let b = {
1008 let mgr = Arc::clone(&mgr);
1009 async move {
1010 mgr.read()
1011 .await
1012 .call_tool("slow", "slow_tool", serde_json::json!({}))
1013 .await
1014 }
1015 };
1016 let (r1, r2) = tokio::join!(a, b);
1017 let elapsed = start.elapsed();
1018
1019 r1.expect("first call succeeds");
1020 r2.expect("second call succeeds");
1021
1022 let serialized_budget = delay * 2 - Duration::from_millis(80);
1026 assert!(
1027 elapsed < serialized_budget,
1028 "concurrent call_tool appears serialized: elapsed={:?}, serialized_budget={:?}",
1029 elapsed,
1030 serialized_budget,
1031 );
1032 }
1033
1034 #[tokio::test]
1035 async fn two_reads_coexist_on_rwlock() {
1036 let mgr = Arc::new(RwLock::new(McpManager::new()));
1040 let _g1 = mgr.read().await;
1041 assert!(
1042 mgr.try_read().is_ok(),
1043 "RwLock rejected a concurrent second read guard",
1044 );
1045 }
1046
1047 #[tokio::test]
1048 async fn write_blocks_while_read_held() {
1049 let mgr = Arc::new(RwLock::new(McpManager::new()));
1050 let _g1 = mgr.read().await;
1051 assert!(
1052 mgr.try_write().is_err(),
1053 "write lock acquired while a read guard was held",
1054 );
1055 }
1056
1057 #[derive(Clone)]
1060 struct IsErrorServer;
1061
1062 impl ServerHandler for IsErrorServer {
1063 fn get_info(&self) -> ServerInfo {
1064 ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
1065 }
1066
1067 async fn call_tool(
1068 &self,
1069 _params: CallToolRequestParams,
1070 _ctx: RequestContext<RoleServer>,
1071 ) -> Result<CallToolResult, McpError> {
1072 Ok(CallToolResult::error(vec![Content::text("tool blew up")]))
1073 }
1074 }
1075
1076 async fn attach_is_error_server(mgr: &mut McpManager, name: &str) {
1077 let (server_side, client_side) = tokio::io::duplex(8192);
1078 tokio::spawn(async move {
1079 if let Ok(running) = IsErrorServer.serve(server_side).await {
1080 let _ = running.waiting().await;
1081 }
1082 });
1083 let handler = AgentBlockClientHandler::new();
1084 let running = handler.serve(client_side).await.expect("handshake");
1085 mgr.servers.insert(name.to_string(), running);
1086 }
1087
1088 #[tokio::test]
1089 async fn is_error_is_passed_through_in_ok_branch() {
1090 let mut mgr = McpManager::new();
1095 attach_is_error_server(&mut mgr, "boom").await;
1096
1097 let val = mgr
1098 .call_tool("boom", "explode", serde_json::json!({}))
1099 .await
1100 .expect("RPC succeeds even when isError=true");
1101
1102 assert_eq!(
1103 val.get("isError").and_then(|v| v.as_bool()),
1104 Some(true),
1105 "isError must be preserved in Ok branch: {val}",
1106 );
1107 let content = val.get("content").and_then(|v| v.as_array()).cloned();
1108 assert!(
1109 content.as_ref().map(|c| !c.is_empty()).unwrap_or(false),
1110 "content blocks must be forwarded alongside isError: {val:?}",
1111 );
1112 }
1113}
1114
1115#[cfg(test)]
1119mod rich_tests {
1120 use super::*;
1121 use rmcp::{
1122 model::{
1123 CompleteRequestParams, CompleteResult, CompletionInfo, GetPromptRequestParams,
1124 GetPromptResult, ListPromptsResult, ListResourceTemplatesResult, ListResourcesResult,
1125 NumberOrString, PaginatedRequestParams, ProgressNotificationParam, ProgressToken,
1126 Prompt, PromptMessage, PromptMessageRole, RawResource, RawResourceTemplate,
1127 ReadResourceRequestParams, ReadResourceResult, Reference, ResourceContents,
1128 ServerCapabilities, ServerInfo,
1129 },
1130 service::{MaybeSendFuture, RequestContext},
1131 ErrorData as McpError, RoleServer, ServerHandler, ServiceExt,
1132 };
1133 use std::sync::Arc;
1134 use tokio::sync::RwLock;
1135
1136 #[derive(Clone)]
1139 struct ResourceTestServer;
1140
1141 impl ServerHandler for ResourceTestServer {
1142 fn get_info(&self) -> ServerInfo {
1143 ServerInfo::new(ServerCapabilities::builder().enable_resources().build())
1144 }
1145
1146 fn list_resources(
1147 &self,
1148 _request: Option<PaginatedRequestParams>,
1149 _ctx: RequestContext<RoleServer>,
1150 ) -> impl std::future::Future<Output = Result<ListResourcesResult, McpError>>
1151 + MaybeSendFuture
1152 + '_ {
1153 let resources = vec![
1154 rmcp::model::Resource::new(
1155 RawResource::new("file:///hello.txt", "hello.txt"),
1156 None,
1157 ),
1158 rmcp::model::Resource::new(
1159 RawResource::new("file:///world.txt", "world.txt"),
1160 None,
1161 ),
1162 ];
1163 std::future::ready(Ok(ListResourcesResult::with_all_items(resources)))
1164 }
1165
1166 fn read_resource(
1167 &self,
1168 request: ReadResourceRequestParams,
1169 _ctx: RequestContext<RoleServer>,
1170 ) -> impl std::future::Future<Output = Result<ReadResourceResult, McpError>> + MaybeSendFuture + '_
1171 {
1172 let uri = request.uri.clone();
1173 let text = format!("content of {uri}");
1174 std::future::ready(Ok(ReadResourceResult::new(vec![ResourceContents::text(
1175 text, uri,
1176 )])))
1177 }
1178
1179 fn list_resource_templates(
1180 &self,
1181 _request: Option<PaginatedRequestParams>,
1182 _ctx: RequestContext<RoleServer>,
1183 ) -> impl std::future::Future<Output = Result<ListResourceTemplatesResult, McpError>>
1184 + MaybeSendFuture
1185 + '_ {
1186 let templates = vec![
1187 rmcp::model::ResourceTemplate::new(
1188 RawResourceTemplate::new("file:///{name}.txt", "file-template"),
1189 None,
1190 ),
1191 rmcp::model::ResourceTemplate::new(
1192 RawResourceTemplate::new("db:///{table}/{id}", "db-template"),
1193 None,
1194 ),
1195 ];
1196 std::future::ready(Ok(ListResourceTemplatesResult::with_all_items(templates)))
1197 }
1198 }
1199
1200 #[derive(Clone)]
1201 struct PromptTestServer;
1202
1203 impl ServerHandler for PromptTestServer {
1204 fn get_info(&self) -> ServerInfo {
1205 ServerInfo::new(ServerCapabilities::builder().enable_prompts().build())
1206 }
1207
1208 fn list_prompts(
1209 &self,
1210 _request: Option<PaginatedRequestParams>,
1211 _ctx: RequestContext<RoleServer>,
1212 ) -> impl std::future::Future<Output = Result<ListPromptsResult, McpError>> + MaybeSendFuture + '_
1213 {
1214 let prompts = vec![
1215 Prompt::new("greet", Some("Greeting prompt"), None),
1216 Prompt::new("farewell", Some("Farewell prompt"), None),
1217 ];
1218 std::future::ready(Ok(ListPromptsResult::with_all_items(prompts)))
1219 }
1220
1221 fn get_prompt(
1222 &self,
1223 request: GetPromptRequestParams,
1224 _ctx: RequestContext<RoleServer>,
1225 ) -> impl std::future::Future<Output = Result<GetPromptResult, McpError>> + MaybeSendFuture + '_
1226 {
1227 let name = request.name.clone();
1228 let message = PromptMessage::new_text(
1229 PromptMessageRole::User,
1230 format!("This is the '{name}' prompt."),
1231 );
1232 std::future::ready(Ok(GetPromptResult::new(vec![message])))
1233 }
1234 }
1235
1236 async fn attach_resource_server(mgr: &mut McpManager, name: &str) {
1239 let (server_side, client_side) = tokio::io::duplex(65536);
1240 tokio::spawn(async move {
1241 if let Ok(running) = ResourceTestServer.serve(server_side).await {
1242 let _ = running.waiting().await;
1243 }
1244 });
1245 let handler = AgentBlockClientHandler::new();
1246 let running = handler.serve(client_side).await.expect("handshake");
1247 mgr.servers.insert(name.to_string(), running);
1248 }
1249
1250 async fn attach_prompt_server(mgr: &mut McpManager, name: &str) {
1251 let (server_side, client_side) = tokio::io::duplex(65536);
1252 tokio::spawn(async move {
1253 if let Ok(running) = PromptTestServer.serve(server_side).await {
1254 let _ = running.waiting().await;
1255 }
1256 });
1257 let handler = AgentBlockClientHandler::new();
1258 let running = handler.serve(client_side).await.expect("handshake");
1259 mgr.servers.insert(name.to_string(), running);
1260 }
1261
1262 #[derive(Clone)]
1263 struct CompleteTestServer;
1264
1265 impl ServerHandler for CompleteTestServer {
1266 fn get_info(&self) -> ServerInfo {
1267 ServerInfo::new(
1269 ServerCapabilities::builder()
1270 .enable_prompts()
1271 .enable_resources()
1272 .build(),
1273 )
1274 }
1275
1276 async fn complete(
1277 &self,
1278 request: CompleteRequestParams,
1279 _ctx: RequestContext<RoleServer>,
1280 ) -> Result<CompleteResult, McpError> {
1281 let info = match &request.r#ref {
1282 Reference::Prompt(_) => CompletionInfo::with_pagination(
1283 vec!["alice".to_string(), "alpha".to_string()],
1284 Some(2),
1285 false,
1286 )
1287 .expect("valid completion info"),
1288 Reference::Resource(_) => CompletionInfo::with_pagination(
1289 vec!["file:///a.txt".to_string()],
1290 Some(1),
1291 false,
1292 )
1293 .expect("valid completion info"),
1294 };
1295 Ok(CompleteResult::new(info))
1296 }
1297 }
1298
1299 async fn attach_complete_server(mgr: &mut McpManager, name: &str) {
1300 let (server_side, client_side) = tokio::io::duplex(65536);
1301 tokio::spawn(async move {
1302 if let Ok(running) = CompleteTestServer.serve(server_side).await {
1303 let _ = running.waiting().await;
1304 }
1305 });
1306 let handler = AgentBlockClientHandler::new();
1307 let running = handler.serve(client_side).await.expect("handshake");
1308 mgr.servers.insert(name.to_string(), running);
1309 }
1310
1311 #[tokio::test]
1314 async fn list_resources_returns_all_resources() {
1315 let mut mgr = McpManager::new();
1316 attach_resource_server(&mut mgr, "res").await;
1317
1318 let result = mgr
1319 .list_resources("res")
1320 .await
1321 .expect("list_resources should succeed");
1322
1323 let arr = result.as_array().expect("should be JSON array");
1324 assert_eq!(arr.len(), 2, "expected 2 resources: {result}");
1325 }
1326
1327 #[tokio::test]
1328 async fn list_resources_unknown_server_returns_error() {
1329 let mgr = McpManager::new();
1330 let err = mgr
1331 .list_resources("ghost")
1332 .await
1333 .expect_err("unknown server must error");
1334 assert!(
1335 err.to_string().contains("no server named"),
1336 "unexpected error: {err}"
1337 );
1338 }
1339
1340 #[tokio::test]
1343 async fn list_resource_templates_returns_all_templates() {
1344 let mut mgr = McpManager::new();
1345 attach_resource_server(&mut mgr, "res").await;
1346
1347 let result = mgr
1348 .list_resource_templates("res")
1349 .await
1350 .expect("list_resource_templates should succeed");
1351
1352 let arr = result.as_array().expect("should be JSON array");
1353 assert_eq!(arr.len(), 2, "expected 2 templates: {result}");
1354
1355 let uri_template = arr[0]
1356 .get("uriTemplate")
1357 .and_then(|v| v.as_str())
1358 .expect("first template should have uriTemplate");
1359 assert!(
1360 uri_template.contains("{name}"),
1361 "uriTemplate should contain placeholder: {uri_template}"
1362 );
1363 }
1364
1365 #[tokio::test]
1366 async fn list_resource_templates_unknown_server_returns_error() {
1367 let mgr = McpManager::new();
1368 let err = mgr
1369 .list_resource_templates("ghost")
1370 .await
1371 .expect_err("unknown server must error");
1372 assert!(
1373 err.to_string().contains("no server named"),
1374 "unexpected error: {err}"
1375 );
1376 }
1377
1378 #[tokio::test]
1381 async fn read_resource_returns_contents() {
1382 let mut mgr = McpManager::new();
1383 attach_resource_server(&mut mgr, "res").await;
1384
1385 let result = mgr
1386 .read_resource("res", "file:///hello.txt")
1387 .await
1388 .expect("read_resource should succeed");
1389
1390 let contents = result
1391 .get("contents")
1392 .and_then(|v| v.as_array())
1393 .expect("should have contents array");
1394 assert!(!contents.is_empty(), "contents must not be empty: {result}");
1395
1396 let text = contents[0]
1397 .get("text")
1398 .and_then(|v| v.as_str())
1399 .expect("should have text field");
1400 assert!(
1401 text.contains("file:///hello.txt"),
1402 "text should contain uri: {text}"
1403 );
1404 }
1405
1406 #[tokio::test]
1407 async fn read_resource_unknown_server_returns_error() {
1408 let mgr = McpManager::new();
1409 let err = mgr
1410 .read_resource("ghost", "file:///any.txt")
1411 .await
1412 .expect_err("unknown server must error");
1413 assert!(
1414 err.to_string().contains("no server named"),
1415 "unexpected error: {err}"
1416 );
1417 }
1418
1419 #[tokio::test]
1422 async fn list_prompts_returns_all_prompts() {
1423 let mut mgr = McpManager::new();
1424 attach_prompt_server(&mut mgr, "prm").await;
1425
1426 let result = mgr
1427 .list_prompts("prm")
1428 .await
1429 .expect("list_prompts should succeed");
1430
1431 let arr = result.as_array().expect("should be JSON array");
1432 assert_eq!(arr.len(), 2, "expected 2 prompts: {result}");
1433 }
1434
1435 #[tokio::test]
1436 async fn list_prompts_unknown_server_returns_error() {
1437 let mgr = McpManager::new();
1438 let err = mgr
1439 .list_prompts("ghost")
1440 .await
1441 .expect_err("unknown server must error");
1442 assert!(
1443 err.to_string().contains("no server named"),
1444 "unexpected error: {err}"
1445 );
1446 }
1447
1448 #[tokio::test]
1451 async fn get_prompt_returns_messages() {
1452 let mut mgr = McpManager::new();
1453 attach_prompt_server(&mut mgr, "prm").await;
1454
1455 let result = mgr
1456 .get_prompt("prm", "greet", serde_json::Value::Null)
1457 .await
1458 .expect("get_prompt should succeed");
1459
1460 let messages = result
1461 .get("messages")
1462 .and_then(|v| v.as_array())
1463 .expect("should have messages array");
1464 assert!(!messages.is_empty(), "messages must not be empty: {result}");
1465 }
1466
1467 #[tokio::test]
1468 async fn get_prompt_rejects_non_object_args() {
1469 let mgr = McpManager::new();
1470 let err = mgr
1471 .get_prompt("any", "greet", serde_json::json!([1, 2]))
1472 .await
1473 .expect_err("array args must error");
1474 assert!(
1475 err.to_string().contains("args must be a JSON object"),
1476 "unexpected error: {err}"
1477 );
1478 }
1479
1480 #[tokio::test]
1481 async fn get_prompt_unknown_server_returns_error() {
1482 let mgr = McpManager::new();
1483 let err = mgr
1484 .get_prompt("ghost", "greet", serde_json::Value::Null)
1485 .await
1486 .expect_err("unknown server must error");
1487 assert!(
1488 err.to_string().contains("no server named"),
1489 "unexpected error: {err}"
1490 );
1491 }
1492
1493 #[tokio::test]
1496 async fn complete_prompt_ref_returns_values() {
1497 let mut mgr = McpManager::new();
1498 attach_complete_server(&mut mgr, "cmp").await;
1499
1500 let ref_json = serde_json::json!({ "type": "ref/prompt", "name": "greet" });
1501 let result = mgr
1502 .complete("cmp", ref_json, "name", "al")
1503 .await
1504 .expect("complete with prompt ref should succeed");
1505
1506 let completion = result
1507 .get("completion")
1508 .expect("result should have 'completion' key");
1509 let values = completion
1510 .get("values")
1511 .and_then(|v| v.as_array())
1512 .expect("completion should have 'values' array");
1513 assert!(
1514 !values.is_empty(),
1515 "values must not be empty for prompt ref: {result}"
1516 );
1517 }
1518
1519 #[tokio::test]
1520 async fn complete_resource_ref_returns_values() {
1521 let mut mgr = McpManager::new();
1522 attach_complete_server(&mut mgr, "cmp").await;
1523
1524 let ref_json = serde_json::json!({ "type": "ref/resource", "uri": "file:///a.txt" });
1525 let result = mgr
1526 .complete("cmp", ref_json, "uri", "file:///")
1527 .await
1528 .expect("complete with resource ref should succeed");
1529
1530 let completion = result
1531 .get("completion")
1532 .expect("result should have 'completion' key");
1533 let values = completion
1534 .get("values")
1535 .and_then(|v| v.as_array())
1536 .expect("completion should have 'values' array");
1537 assert!(
1538 !values.is_empty(),
1539 "values must not be empty for resource ref: {result}"
1540 );
1541 }
1542
1543 #[tokio::test]
1544 async fn complete_unknown_server_returns_error() {
1545 let mgr = McpManager::new();
1546 let ref_json = serde_json::json!({ "type": "ref/prompt", "name": "greet" });
1547 let err = mgr
1548 .complete("ghost", ref_json, "name", "al")
1549 .await
1550 .expect_err("unknown server must error");
1551 assert!(
1552 err.to_string().contains("no server named"),
1553 "unexpected error: {err}"
1554 );
1555 }
1556
1557 #[tokio::test]
1558 async fn complete_invalid_ref_kind_returns_error() {
1559 let mgr = McpManager::new();
1560 let ref_json = serde_json::json!({ "type": "ref/unknown", "name": "x" });
1561 let err = mgr
1562 .complete("any", ref_json, "name", "x")
1563 .await
1564 .expect_err("invalid ref kind must error");
1565 assert!(
1566 err.to_string().contains("invalid ref kind"),
1567 "unexpected error: {err}"
1568 );
1569 }
1570
1571 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1576 async fn concurrent_list_resources_and_list_prompts() {
1577 let mgr = Arc::new(RwLock::new(McpManager::new()));
1578
1579 {
1580 let mut w = mgr.write().await;
1581 attach_resource_server(&mut w, "res").await;
1582 attach_prompt_server(&mut w, "prm").await;
1583 }
1584
1585 let mgr_a = Arc::clone(&mgr);
1586 let mgr_b = Arc::clone(&mgr);
1587
1588 let (r1, r2) = tokio::join!(
1589 async move { mgr_a.read().await.list_resources("res").await },
1590 async move { mgr_b.read().await.list_prompts("prm").await },
1591 );
1592
1593 r1.expect("list_resources should succeed concurrently");
1594 r2.expect("list_prompts should succeed concurrently");
1595 }
1596
1597 #[test]
1600 fn mark_on_progress_sets_flag_accessible_by_handler() {
1601 let handler = AgentBlockClientHandler::new();
1602 handler.ensure_server("srv");
1603 assert!(
1604 !handler
1605 .registry
1606 .lock()
1607 .unwrap()
1608 .get("srv")
1609 .unwrap()
1610 .on_progress
1611 );
1612 handler.mark_on_progress("srv");
1613 assert!(
1614 handler
1615 .registry
1616 .lock()
1617 .unwrap()
1618 .get("srv")
1619 .unwrap()
1620 .on_progress
1621 );
1622 }
1623
1624 #[tokio::test]
1628 async fn connect_http_unreachable_returns_error() {
1629 let mut mgr = McpManager::with_rpc_timeout(Duration::from_millis(100))
1630 .expect("non-zero timeout must be accepted");
1631
1632 let err = mgr
1633 .connect_http(
1634 "test",
1635 "http://127.0.0.1:19999/mcp",
1636 serde_json::Value::Null,
1637 )
1638 .await
1639 .expect_err("unreachable URL must produce an error");
1640
1641 let msg = err.to_string();
1642 assert!(
1643 msg.contains("http connect") || msg.contains("timed out"),
1644 "unexpected error: {msg}"
1645 );
1646 }
1647
1648 #[test]
1651 fn mark_on_log_sets_flag_accessible_by_handler() {
1652 let handler = AgentBlockClientHandler::new();
1653 handler.ensure_server("log-srv");
1654 assert!(
1655 !handler
1656 .registry
1657 .lock()
1658 .unwrap()
1659 .get("log-srv")
1660 .unwrap()
1661 .on_log
1662 );
1663 handler.mark_on_log("log-srv");
1664 assert!(
1665 handler
1666 .registry
1667 .lock()
1668 .unwrap()
1669 .get("log-srv")
1670 .unwrap()
1671 .on_log
1672 );
1673 }
1674
1675 #[test]
1676 fn mark_sampling_sets_flag_accessible_by_handler() {
1677 let handler = AgentBlockClientHandler::new();
1678 handler.ensure_server("samp-srv");
1679 assert!(
1680 !handler
1681 .registry
1682 .lock()
1683 .unwrap()
1684 .get("samp-srv")
1685 .unwrap()
1686 .sampling
1687 );
1688 handler.mark_sampling("samp-srv");
1689 assert!(
1690 handler
1691 .registry
1692 .lock()
1693 .unwrap()
1694 .get("samp-srv")
1695 .unwrap()
1696 .sampling
1697 );
1698 }
1699
1700 #[tokio::test]
1704 async fn send_cancelled_unknown_server_is_no_op() {
1705 let mgr = McpManager::new();
1706 mgr.send_cancelled("ghost", Some(42));
1708 }
1709
1710 #[tokio::test]
1712 async fn send_cancelled_live_server_does_not_panic() {
1713 let mut mgr = McpManager::new();
1714 attach_resource_server(&mut mgr, "res").await;
1715 mgr.send_cancelled("res", Some(0));
1717 tokio::time::sleep(Duration::from_millis(50)).await;
1719 }
1720
1721 #[test]
1731 fn handler_server_name_reset_after_simulated_connect() {
1732 let mut mgr = McpManager::new();
1733 mgr.handler.ensure_server("srv-x");
1735 mgr.handler.server_name = Some("srv-x".to_string());
1736 let cloned = mgr.handler.clone();
1737 mgr.handler.server_name = None;
1738
1739 assert!(
1741 mgr.handler.server_name.is_none(),
1742 "template server_name must be None after simulated connect"
1743 );
1744 assert_eq!(
1745 cloned.server_name.as_deref(),
1746 Some("srv-x"),
1747 "cloned handler must carry the server_name"
1748 );
1749 let guard = mgr.handler.registry.lock().unwrap();
1751 assert!(
1752 guard.contains_key("srv-x"),
1753 "registry must have entry after ensure_server"
1754 );
1755 }
1756
1757 #[tokio::test]
1763 async fn on_progress_no_op_when_no_isle() {
1764 let handler = AgentBlockClientHandler::new();
1765 handler.ensure_server("srv");
1766 handler.mark_on_progress("srv");
1767
1768 let params = ProgressNotificationParam {
1770 progress_token: ProgressToken(NumberOrString::String("tok-1".into())),
1771 progress: 0.5,
1772 total: Some(1.0),
1773 message: None,
1774 };
1775
1776 let guard = handler.registry.lock().unwrap();
1781 assert!(
1782 guard.get("srv").unwrap().on_progress,
1783 "on_progress flag must be set after mark_on_progress"
1784 );
1785 drop(guard);
1786
1787 let _ = params;
1790 }
1791
1792 #[tokio::test]
1795 async fn server_info_unknown_server_returns_error() {
1796 let mgr = McpManager::new();
1797 let err = mgr
1798 .server_info("ghost")
1799 .expect_err("unknown server must error");
1800 assert!(
1801 err.to_string().contains("no server named"),
1802 "unexpected error: {err}"
1803 );
1804 }
1805
1806 #[tokio::test]
1807 async fn server_info_returns_capabilities_for_resource_server() {
1808 let mut mgr = McpManager::new();
1809 attach_resource_server(&mut mgr, "res").await;
1810
1811 let info = mgr
1812 .server_info("res")
1813 .expect("server_info should succeed after handshake");
1814
1815 let caps = info
1816 .get("capabilities")
1817 .expect("InitializeResult must have capabilities field");
1818 assert!(
1819 caps.get("resources").is_some(),
1820 "resource server must advertise resources capability: {caps}"
1821 );
1822 }
1823
1824 #[tokio::test]
1825 async fn server_info_returns_capabilities_for_prompt_server() {
1826 let mut mgr = McpManager::new();
1827 attach_prompt_server(&mut mgr, "prm").await;
1828
1829 let info = mgr
1830 .server_info("prm")
1831 .expect("server_info should succeed after handshake");
1832
1833 let caps = info
1834 .get("capabilities")
1835 .expect("InitializeResult must have capabilities field");
1836 assert!(
1837 caps.get("prompts").is_some(),
1838 "prompt server must advertise prompts capability: {caps}"
1839 );
1840 }
1841
1842 #[derive(Clone)]
1846 struct LoggingCapableServer;
1847
1848 impl ServerHandler for LoggingCapableServer {
1849 #[allow(deprecated)]
1853 fn get_info(&self) -> ServerInfo {
1854 ServerInfo::new(
1855 ServerCapabilities::builder()
1856 .enable_tools()
1857 .enable_logging()
1858 .build(),
1859 )
1860 }
1861 }
1862
1863 async fn attach_logging_server(mgr: &mut McpManager, name: &str) {
1864 let (server_side, client_side) = tokio::io::duplex(65536);
1865 tokio::spawn(async move {
1866 if let Ok(running) = LoggingCapableServer.serve(server_side).await {
1867 let _ = running.waiting().await;
1868 }
1869 });
1870 let handler = AgentBlockClientHandler::new();
1871 let running = handler.serve(client_side).await.expect("handshake");
1872 mgr.servers.insert(name.to_string(), running);
1873 }
1874
1875 #[tokio::test]
1880 async fn server_info_returns_logging_capability_when_declared() {
1881 let mut mgr = McpManager::new();
1882 attach_logging_server(&mut mgr, "log").await;
1883
1884 let info = mgr
1885 .server_info("log")
1886 .expect("server_info should succeed after handshake");
1887
1888 let caps = info
1889 .get("capabilities")
1890 .expect("InitializeResult must have capabilities field");
1891 assert!(
1892 caps.get("logging").is_some(),
1893 "logging-capable server must advertise logging capability: {caps}"
1894 );
1895 }
1896
1897 #[tokio::test]
1901 async fn server_info_has_no_logging_capability_for_tool_only_server() {
1902 let mut mgr = McpManager::new();
1903 attach_resource_server(&mut mgr, "res").await;
1904
1905 let info = mgr
1906 .server_info("res")
1907 .expect("server_info should succeed after handshake");
1908
1909 let caps = info
1910 .get("capabilities")
1911 .expect("InitializeResult must have capabilities field");
1912 assert!(
1913 caps.get("logging").is_none(),
1914 "resource-only server must not advertise logging capability: {caps}"
1915 );
1916 }
1917
1918 #[tokio::test]
1924 async fn call_tool_succeeds_with_and_without_progress_handler() {
1925 let mut mgr = McpManager::new();
1926 attach_resource_server(&mut mgr, "srv").await;
1927
1928 mgr.list_resources("srv")
1930 .await
1931 .expect("list_resources without handler should succeed");
1932
1933 mgr.handler.mark_on_progress("srv");
1935 mgr.list_resources("srv")
1936 .await
1937 .expect("list_resources with handler should succeed");
1938 }
1939
1940 #[derive(Clone)]
1948 struct RootsTestServer;
1949
1950 impl ServerHandler for RootsTestServer {
1951 fn get_info(&self) -> ServerInfo {
1952 ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
1953 }
1954
1955 #[allow(deprecated)]
1959 async fn call_tool(
1960 &self,
1961 _params: rmcp::model::CallToolRequestParams,
1962 ctx: RequestContext<RoleServer>,
1963 ) -> Result<rmcp::model::CallToolResult, McpError> {
1964 let roots_result = ctx.peer.list_roots().await.map_err(|e| {
1968 McpError::internal_error(format!("server list_roots failed: {e}"), None)
1969 })?;
1970 let count = roots_result.roots.len();
1972 let first_uri = roots_result
1973 .roots
1974 .first()
1975 .map(|r| r.uri.as_str())
1976 .unwrap_or("(none)");
1977 Ok(rmcp::model::CallToolResult::success(vec![
1978 rmcp::model::Content::text(format!("roots:{count}:{first_uri}")),
1979 ]))
1980 }
1981 }
1982
1983 async fn attach_roots_server_with_isle(
1988 mgr: &mut McpManager,
1989 name: &str,
1990 ) -> mlua_isle::AsyncIsleDriver {
1991 use mlua_isle::AsyncIsle;
1992
1993 let (isle, driver) = AsyncIsle::spawn(|_lua: &mlua::Lua| Ok(()))
1995 .await
1996 .expect("AsyncIsle::spawn should succeed");
1997
1998 let name_owned = name.to_string();
1999 isle.exec(move |lua| {
2000 handler::install_mcp_dispatcher_on_handler_isle(lua)
2001 .map_err(|e| mlua_isle::IsleError::Lua(format!("setup dispatcher: {e}")))?;
2002 use mlua::prelude::*;
2004 let handlers: LuaTable = lua
2005 .globals()
2006 .get("__mcp_roots_handlers")
2007 .map_err(|e| mlua_isle::IsleError::Lua(format!("get handlers: {e}")))?;
2008 let cb: LuaFunction = lua
2009 .load(
2010 r#"
2011 return function(server_name)
2012 return {
2013 { uri = "file:///test", name = "TestRoot" },
2014 }
2015 end
2016 "#,
2017 )
2018 .set_name("@test_roots_handler")
2019 .eval()
2020 .map_err(|e| mlua_isle::IsleError::Lua(format!("eval: {e}")))?;
2021 handlers
2022 .set(name_owned.as_str(), cb)
2023 .map_err(|e| mlua_isle::IsleError::Lua(format!("set handler: {e}")))?;
2024 Ok(String::new())
2025 })
2026 .await
2027 .expect("isle setup must succeed");
2028
2029 let isle_arc = std::sync::Arc::new(isle);
2030
2031 let mut handler = AgentBlockClientHandler::new();
2034 handler.handler_isle = Some(std::sync::Arc::clone(&isle_arc));
2035 handler.server_name = Some(name.to_string());
2036 handler.mark_roots(name);
2037
2038 let (server_side, client_side) = tokio::io::duplex(65536);
2039 tokio::spawn(async move {
2040 if let Ok(running) = RootsTestServer.serve(server_side).await {
2041 let _ = running.waiting().await;
2042 }
2043 });
2044 let running = handler.serve(client_side).await.expect("handshake");
2045 mgr.servers.insert(name.to_string(), running);
2046
2047 driver
2048 }
2049
2050 async fn attach_roots_server_bare(mgr: &mut McpManager, name: &str) {
2053 let (server_side, client_side) = tokio::io::duplex(65536);
2054 tokio::spawn(async move {
2055 if let Ok(running) = RootsTestServer.serve(server_side).await {
2056 let _ = running.waiting().await;
2057 }
2058 });
2059 let handler = AgentBlockClientHandler::new();
2060 let running = handler.serve(client_side).await.expect("handshake");
2061 mgr.servers.insert(name.to_string(), running);
2062 }
2063
2064 #[test]
2068 fn mark_roots_sets_flag_accessible_by_handler() {
2069 let handler = AgentBlockClientHandler::new();
2070 handler.ensure_server("roots-srv");
2071 assert!(
2072 !handler
2073 .registry
2074 .lock()
2075 .unwrap()
2076 .get("roots-srv")
2077 .unwrap()
2078 .roots
2079 );
2080 handler.mark_roots("roots-srv");
2081 assert!(
2082 handler
2083 .registry
2084 .lock()
2085 .unwrap()
2086 .get("roots-srv")
2087 .unwrap()
2088 .roots
2089 );
2090 }
2091
2092 #[tokio::test]
2096 async fn notify_roots_list_changed_unknown_server_is_no_op() {
2097 let mgr = McpManager::new();
2098 mgr.notify_roots_list_changed("ghost");
2100 }
2101
2102 #[tokio::test]
2105 async fn notify_roots_list_changed_live_server_does_not_panic() {
2106 let mut mgr = McpManager::new();
2107 attach_resource_server(&mut mgr, "res").await;
2108 mgr.notify_roots_list_changed("res");
2109 tokio::time::sleep(Duration::from_millis(50)).await;
2111 }
2112
2113 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2128 async fn live_duplex_roots_round_trip() {
2129 let mut mgr = McpManager::new();
2130 let _driver = attach_roots_server_with_isle(&mut mgr, "roots").await;
2131
2132 let mgr_arc = std::sync::Arc::new(tokio::sync::RwLock::new(mgr));
2134
2135 let mgr_a = std::sync::Arc::clone(&mgr_arc);
2137 let call_handle = tokio::spawn(async move {
2138 mgr_a
2139 .read()
2140 .await
2141 .call_tool("roots", "any_tool", serde_json::json!({}))
2142 .await
2143 });
2144
2145 let mgr_b = std::sync::Arc::clone(&mgr_arc);
2147 let notify_handle = tokio::spawn(async move {
2148 tokio::time::sleep(Duration::from_millis(5)).await;
2150 mgr_b.read().await.notify_roots_list_changed("roots");
2151 });
2152
2153 let tool_result = call_handle.await.expect("call_handle must not panic");
2155 notify_handle.await.expect("notify_handle must not panic");
2156
2157 let result = tool_result.expect("call_tool must succeed");
2159 let result_json = serde_json::to_string(&result).expect("serialize result");
2160 assert!(
2161 result_json.contains("roots:1:file:///test"),
2162 "expected roots:1:file:///test in tool result: {result_json}"
2163 );
2164 }
2165
2166 #[tokio::test]
2169 async fn live_duplex_roots_no_handler_returns_error() {
2170 let mut mgr = McpManager::new();
2171 attach_roots_server_bare(&mut mgr, "roots-no-handler").await;
2174
2175 let result = mgr
2176 .call_tool("roots-no-handler", "any_tool", serde_json::json!({}))
2177 .await;
2178 assert!(
2181 result.is_err(),
2182 "call_tool must fail when no roots handler is registered: {result:?}"
2183 );
2184 }
2185
2186 #[derive(Clone)]
2193 struct ElicitationTestServer;
2194
2195 impl ServerHandler for ElicitationTestServer {
2196 fn get_info(&self) -> ServerInfo {
2197 ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
2198 }
2199
2200 async fn call_tool(
2201 &self,
2202 _params: rmcp::model::CallToolRequestParams,
2203 ctx: RequestContext<RoleServer>,
2204 ) -> Result<rmcp::model::CallToolResult, McpError> {
2205 use rmcp::model::{
2206 CreateElicitationRequestParams, ElicitationSchema, PrimitiveSchema, StringSchema,
2207 };
2208 use std::collections::BTreeMap;
2209
2210 let mut props = BTreeMap::new();
2212 props.insert(
2213 "name".to_string(),
2214 PrimitiveSchema::String(StringSchema::new()),
2215 );
2216 let schema = ElicitationSchema::new(props);
2217 let req = CreateElicitationRequestParams::FormElicitationParams {
2218 meta: None,
2219 message: "What is your name?".to_string(),
2220 requested_schema: schema,
2221 };
2222
2223 let result =
2224 ctx.peer.create_elicitation(req).await.map_err(|e| {
2225 McpError::internal_error(format!("create_elicitation: {e}"), None)
2226 })?;
2227
2228 let action_str = match result.action {
2230 rmcp::model::ElicitationAction::Accept => "accept",
2231 rmcp::model::ElicitationAction::Decline => "decline",
2232 rmcp::model::ElicitationAction::Cancel => "cancel",
2233 };
2234 let content_str = result
2235 .content
2236 .map(|v| serde_json::to_string(&v).unwrap_or_default())
2237 .unwrap_or_default();
2238 Ok(rmcp::model::CallToolResult::success(vec![
2239 rmcp::model::Content::text(format!("elicitation:{action_str}:{content_str}")),
2240 ]))
2241 }
2242 }
2243
2244 #[derive(Clone)]
2248 struct ElicitationUrlTestServer;
2249
2250 impl ServerHandler for ElicitationUrlTestServer {
2251 fn get_info(&self) -> ServerInfo {
2252 ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
2253 }
2254
2255 async fn call_tool(
2256 &self,
2257 _params: rmcp::model::CallToolRequestParams,
2258 ctx: RequestContext<RoleServer>,
2259 ) -> Result<rmcp::model::CallToolResult, McpError> {
2260 use rmcp::model::CreateElicitationRequestParams;
2261
2262 let req = CreateElicitationRequestParams::UrlElicitationParams {
2264 meta: None,
2265 message: "Please complete this form online".to_string(),
2266 url: "https://example.com/form".to_string(),
2267 elicitation_id: "test-elicitation-id-001".to_string(),
2268 };
2269
2270 let result =
2271 ctx.peer.create_elicitation(req).await.map_err(|e| {
2272 McpError::internal_error(format!("create_elicitation: {e}"), None)
2273 })?;
2274
2275 let action_str = match result.action {
2276 rmcp::model::ElicitationAction::Accept => "accept",
2277 rmcp::model::ElicitationAction::Decline => "decline",
2278 rmcp::model::ElicitationAction::Cancel => "cancel",
2279 };
2280 Ok(rmcp::model::CallToolResult::success(vec![
2281 rmcp::model::Content::text(format!("url_elicitation:{action_str}")),
2282 ]))
2283 }
2284 }
2285
2286 async fn attach_elicitation_server_with_isle(
2292 mgr: &mut McpManager,
2293 name: &str,
2294 action: &str,
2295 ) -> mlua_isle::AsyncIsleDriver {
2296 use mlua_isle::AsyncIsle;
2297
2298 let (isle, driver) = AsyncIsle::spawn(|_lua: &mlua::Lua| Ok(()))
2299 .await
2300 .expect("AsyncIsle::spawn should succeed");
2301
2302 let name_owned = name.to_string();
2303 let action_owned = action.to_string();
2304 isle.exec(move |lua| {
2305 handler::install_mcp_dispatcher_on_handler_isle(lua)
2306 .map_err(|e| mlua_isle::IsleError::Lua(format!("setup dispatcher: {e}")))?;
2307 use mlua::prelude::*;
2309 let handlers: LuaTable = lua
2310 .globals()
2311 .get("__mcp_elicitation_handlers")
2312 .map_err(|e| mlua_isle::IsleError::Lua(format!("get handlers: {e}")))?;
2313
2314 let handler_src = match action_owned.as_str() {
2316 "accept" => {
2317 r#"
2318 return function(server_name, message, schema_json)
2319 return { action = "accept", content = { name = "Alice" } }
2320 end
2321 "#
2322 }
2323 "decline" => {
2324 r#"
2325 return function(server_name, message, schema_json)
2326 return { action = "decline" }
2327 end
2328 "#
2329 }
2330 "cancel" => {
2331 r#"
2332 return function(server_name, message, schema_json)
2333 return { action = "cancel" }
2334 end
2335 "#
2336 }
2337 _ => {
2338 r#"
2339 return function(server_name, message, schema_json)
2340 return { action = "decline" }
2341 end
2342 "#
2343 }
2344 };
2345
2346 let cb: LuaFunction = lua
2347 .load(handler_src)
2348 .set_name("@test_elicitation_handler")
2349 .eval()
2350 .map_err(|e| mlua_isle::IsleError::Lua(format!("eval: {e}")))?;
2351 handlers
2352 .set(name_owned.as_str(), cb)
2353 .map_err(|e| mlua_isle::IsleError::Lua(format!("set handler: {e}")))?;
2354 Ok(String::new())
2355 })
2356 .await
2357 .expect("isle setup must succeed");
2358
2359 let isle_arc = std::sync::Arc::new(isle);
2360
2361 let mut handler = AgentBlockClientHandler::new();
2362 handler.handler_isle = Some(std::sync::Arc::clone(&isle_arc));
2363 handler.server_name = Some(name.to_string());
2364 handler.mark_elicitation(name);
2365
2366 let (server_side, client_side) = tokio::io::duplex(65536);
2367 tokio::spawn(async move {
2368 if let Ok(running) = ElicitationTestServer.serve(server_side).await {
2369 let _ = running.waiting().await;
2370 }
2371 });
2372 let running = handler.serve(client_side).await.expect("handshake");
2373 mgr.servers.insert(name.to_string(), running);
2374
2375 driver
2376 }
2377
2378 async fn attach_elicitation_server_bare(mgr: &mut McpManager, name: &str) {
2383 let (server_side, client_side) = tokio::io::duplex(65536);
2384 tokio::spawn(async move {
2385 if let Ok(running) = ElicitationTestServer.serve(server_side).await {
2386 let _ = running.waiting().await;
2387 }
2388 });
2389 let mut handler = AgentBlockClientHandler::new();
2390 handler.ensure_server(name);
2391 handler.server_name = Some(name.to_string());
2392 let running = handler.serve(client_side).await.expect("handshake");
2393 mgr.servers.insert(name.to_string(), running);
2394 }
2395
2396 async fn attach_elicitation_url_server(mgr: &mut McpManager, name: &str) {
2399 let (server_side, client_side) = tokio::io::duplex(65536);
2400 tokio::spawn(async move {
2401 if let Ok(running) = ElicitationUrlTestServer.serve(server_side).await {
2402 let _ = running.waiting().await;
2403 }
2404 });
2405 let handler = AgentBlockClientHandler::new();
2406 let running = handler.serve(client_side).await.expect("handshake");
2407 mgr.servers.insert(name.to_string(), running);
2408 }
2409
2410 #[test]
2414 fn mark_elicitation_sets_flag_accessible_by_handler() {
2415 let handler = AgentBlockClientHandler::new();
2416 handler.ensure_server("elicit-srv");
2417 assert!(
2418 !handler
2419 .registry
2420 .lock()
2421 .unwrap()
2422 .get("elicit-srv")
2423 .unwrap()
2424 .elicitation
2425 );
2426 handler.mark_elicitation("elicit-srv");
2427 assert!(
2428 handler
2429 .registry
2430 .lock()
2431 .unwrap()
2432 .get("elicit-srv")
2433 .unwrap()
2434 .elicitation
2435 );
2436 }
2437
2438 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2442 async fn elicitation_accept_returns_accept_with_content() {
2443 let mut mgr = McpManager::new();
2444 let _driver = attach_elicitation_server_with_isle(&mut mgr, "elicit", "accept").await;
2445
2446 let result = mgr
2447 .call_tool("elicit", "any_tool", serde_json::json!({}))
2448 .await
2449 .expect("call_tool must succeed");
2450 let result_json = serde_json::to_string(&result).expect("serialize result");
2451 assert!(
2452 result_json.contains("elicitation:accept:"),
2453 "expected elicitation:accept: in result: {result_json}"
2454 );
2455 assert!(
2456 result_json.contains("Alice"),
2457 "expected Alice in content: {result_json}"
2458 );
2459 }
2460
2461 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2463 async fn elicitation_decline_returns_decline() {
2464 let mut mgr = McpManager::new();
2465 let _driver = attach_elicitation_server_with_isle(&mut mgr, "elicit", "decline").await;
2466
2467 let result = mgr
2468 .call_tool("elicit", "any_tool", serde_json::json!({}))
2469 .await
2470 .expect("call_tool must succeed");
2471 let result_json = serde_json::to_string(&result).expect("serialize result");
2472 assert!(
2473 result_json.contains("elicitation:decline:"),
2474 "expected elicitation:decline: in result: {result_json}"
2475 );
2476 }
2477
2478 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2480 async fn elicitation_cancel_returns_cancel() {
2481 let mut mgr = McpManager::new();
2482 let _driver = attach_elicitation_server_with_isle(&mut mgr, "elicit", "cancel").await;
2483
2484 let result = mgr
2485 .call_tool("elicit", "any_tool", serde_json::json!({}))
2486 .await
2487 .expect("call_tool must succeed");
2488 let result_json = serde_json::to_string(&result).expect("serialize result");
2489 assert!(
2490 result_json.contains("elicitation:cancel:"),
2491 "expected elicitation:cancel: in result: {result_json}"
2492 );
2493 }
2494
2495 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2497 async fn elicitation_url_variant_always_declines() {
2498 let mut mgr = McpManager::new();
2499 attach_elicitation_url_server(&mut mgr, "elicit-url").await;
2500
2501 let result = mgr
2502 .call_tool("elicit-url", "any_tool", serde_json::json!({}))
2503 .await
2504 .expect("call_tool must succeed");
2505 let result_json = serde_json::to_string(&result).expect("serialize result");
2506 assert!(
2507 result_json.contains("url_elicitation:decline"),
2508 "expected url_elicitation:decline in result: {result_json}"
2509 );
2510 }
2511
2512 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2514 async fn elicitation_no_handler_returns_decline() {
2515 let mut mgr = McpManager::new();
2516 attach_elicitation_server_bare(&mut mgr, "elicit-bare").await;
2518
2519 let result = mgr
2520 .call_tool("elicit-bare", "any_tool", serde_json::json!({}))
2521 .await
2522 .expect("call_tool must succeed — no handler → Decline, not error");
2523 let result_json = serde_json::to_string(&result).expect("serialize result");
2524 assert!(
2525 result_json.contains("elicitation:decline:"),
2526 "expected elicitation:decline: when no handler registered: {result_json}"
2527 );
2528 }
2529
2530 #[derive(Clone)]
2536 struct SlowPingServer {
2537 delay: Duration,
2538 }
2539
2540 impl ServerHandler for SlowPingServer {
2541 fn get_info(&self) -> ServerInfo {
2542 ServerInfo::new(ServerCapabilities::builder().build())
2543 }
2544
2545 async fn ping(&self, _ctx: RequestContext<RoleServer>) -> Result<(), McpError> {
2546 tokio::time::sleep(self.delay).await;
2547 Ok(())
2548 }
2549 }
2550
2551 async fn attach_slow_ping_server(mgr: &mut McpManager, name: &str, delay: Duration) {
2553 let (server_side, client_side) = tokio::io::duplex(8192);
2554 let server = SlowPingServer { delay };
2555 tokio::spawn(async move {
2556 if let Ok(running) = server.serve(server_side).await {
2557 let _ = running.waiting().await;
2558 }
2559 });
2560 let handler = AgentBlockClientHandler::new();
2561 let running = handler
2562 .serve(client_side)
2563 .await
2564 .expect("client handshake should succeed over duplex");
2565 mgr.servers.insert(name.to_string(), running);
2566 }
2567
2568 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2573 async fn ping_success_returns_latency_ms() {
2574 let mut mgr = McpManager::new();
2575 attach_slow_ping_server(&mut mgr, "pingsrv", Duration::from_millis(0)).await;
2577
2578 let result = mgr.ping("pingsrv").await;
2579 let latency_ms = result.expect("ping should succeed against a live server");
2580 assert!(
2584 latency_ms <= 5000,
2585 "latency_ms={latency_ms} looks unreasonable (> 5 s)"
2586 );
2587 }
2588
2589 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2592 async fn ping_timeout_returns_block_error_timeout() {
2593 let mut mgr = McpManager::with_rpc_timeout(Duration::from_millis(1))
2594 .expect("with_rpc_timeout(1ms) should succeed");
2595 attach_slow_ping_server(&mut mgr, "slowping", Duration::from_millis(200)).await;
2597
2598 let result = mgr.ping("slowping").await;
2599 let err = result.expect_err("ping should time out");
2600 assert!(
2601 matches!(err, BlockError::Timeout(_)),
2602 "expected BlockError::Timeout, got: {err:?}"
2603 );
2604 let msg = err.to_string();
2605 assert!(
2606 msg.contains("timed out"),
2607 "timeout message should contain 'timed out': {msg}"
2608 );
2609 }
2610}