1use std::sync::Arc;
14use std::time::Duration;
15
16use arc_swap::ArcSwap;
17use dashmap::DashMap;
18use serde_json::Value;
19use tokio::sync::mpsc;
20
21use crate::content::types::Content;
22use crate::managers::cancellation::{CancellationManager, CancelledNotificationParams};
23use crate::managers::completion::CompletionManager;
24use crate::managers::subscription::{SubscribeRequest, SubscriptionManager, UnsubscribeRequest};
25use crate::protocol::capabilities::{InitializeRequest, InitializeResult, ServerCapabilities};
26use crate::protocol::errors::{ErrorType, McpError, codes};
27use crate::protocol::methods::McpMethod;
28use crate::protocol::types::{Implementation, JsonRpcRequest, JsonRpcResponse};
29use crate::protocol::version;
30use crate::registry::prompts::PromptManager;
31use crate::registry::resources::ResourceManager;
32use crate::registry::tools::{ToolError, ToolRegistry};
33use crate::server::handler::{
34 RequestContext, error_response, require_initialization, success_response,
35};
36use crate::server::middleware::MiddlewareChain;
37use crate::server::multiplexer::{
38 ClientRequester, CreateMessageParams, CreateMessageResult, JsonRpcClientRequest,
39 ListRootsResult, MultiplexerError, RequestMultiplexer, Root,
40};
41use crate::server::session::Session;
42use crate::server::visibility::{Environment, VisibilityContext};
43use crate::transport::traits::{IncomingMessage, JsonRpcNotification, Transport};
44
45fn tool_error_to_mcp_error(tool_name: &str, err: ToolError) -> McpError {
46 match err {
47 ToolError::NotFound(_) => McpError::not_found(
48 codes::TOOL_NOT_FOUND,
49 format!("Tool not found: {}", tool_name),
50 ),
51 ToolError::InvalidArguments(message) => {
52 McpError::validation(codes::INVALID_TOOL_ARGS, message)
53 }
54 ToolError::Execution(message) | ToolError::Internal(message) => {
55 McpError::internal(codes::TOOL_EXECUTION_FAILED, message)
56 }
57 ToolError::CircuitOpen { tool, message } => McpError::builder(
58 ErrorType::CircuitOpen,
59 codes::CIRCUIT_BREAKER_OPEN,
60 )
61 .message(message)
62 .detail("tool", tool)
63 .build(),
64 }
65}
66
67pub struct Server {
69 name: String,
71
72 version: String,
74
75 instructions: Arc<ArcSwap<Option<String>>>,
77
78 capabilities: Arc<ArcSwap<ServerCapabilities>>,
80
81 sessions: DashMap<String, Session>,
83
84 middleware: MiddlewareChain,
86
87 tool_registry: ToolRegistry,
89
90 resource_manager: ResourceManager,
92
93 prompt_manager: PromptManager,
95
96 notification_tx: mpsc::UnboundedSender<JsonRpcNotification>,
98
99 notification_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<JsonRpcNotification>>>,
101
102 logger: crate::logging::McpLogger,
104
105 multiplexer: Arc<RequestMultiplexer>,
107
108 request_tx: mpsc::UnboundedSender<JsonRpcClientRequest>,
110
111 request_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<JsonRpcClientRequest>>>,
113
114 task_store: Arc<crate::managers::task::TaskStore>,
116
117 cancellation_manager: CancellationManager,
119
120 subscription_manager: SubscriptionManager,
122
123 completion_manager: CompletionManager,
125
126 environment: Option<Arc<dyn Environment>>,
128}
129
130impl Server {
131 pub fn new(name: impl Into<String>, version: impl Into<String>) -> Self {
133 let (notification_tx, notification_rx) = mpsc::unbounded_channel();
134 let (request_tx, request_rx) = mpsc::unbounded_channel();
135
136 let task_store = Arc::new(crate::managers::task::TaskStore::new(
138 std::time::Duration::from_secs(300),
139 std::time::Duration::from_secs(5),
140 ));
141
142 if tokio::runtime::Handle::try_current().is_ok() {
144 task_store
145 .clone()
146 .spawn_cleanup_task(std::time::Duration::from_secs(60));
147 }
148
149 let default_caps = ServerCapabilities {
151 tasks: Some(crate::protocol::capabilities::TasksCapability {
152 list: Some(crate::protocol::capabilities::EmptyObject {}),
153 cancel: Some(crate::protocol::capabilities::EmptyObject {}),
154 requests: Some(crate::protocol::capabilities::TasksRequestsCapability {
155 tools: Some(crate::protocol::capabilities::TasksToolsCapability {
156 call: Some(crate::protocol::capabilities::EmptyObject {}),
157 }),
158 ..Default::default()
159 }),
160 }),
161 ..Default::default()
162 };
163
164 let logger = crate::logging::McpLogger::new(notification_tx.clone(), "mcp-server");
166
167 Self {
168 name: name.into(),
169 version: version.into(),
170 instructions: Arc::new(ArcSwap::new(Arc::new(None))),
171 capabilities: Arc::new(ArcSwap::new(Arc::new(default_caps))),
172 sessions: DashMap::new(),
173 middleware: MiddlewareChain::new(),
174 tool_registry: ToolRegistry::new(),
175 resource_manager: ResourceManager::new(),
176 prompt_manager: PromptManager::new(),
177 notification_tx: notification_tx.clone(),
178 notification_rx: Arc::new(tokio::sync::Mutex::new(notification_rx)),
179 logger,
180 multiplexer: Arc::new(RequestMultiplexer::new()),
181 request_tx,
182 request_rx: Arc::new(tokio::sync::Mutex::new(request_rx)),
183 task_store,
184 cancellation_manager: CancellationManager::new(),
185 subscription_manager: SubscriptionManager::with_notifications(notification_tx),
186 completion_manager: CompletionManager::new(),
187 environment: None,
188 }
189 }
190
191 pub fn name(&self) -> &str {
193 &self.name
194 }
195
196 pub fn version(&self) -> &str {
198 &self.version
199 }
200
201 pub fn capabilities(&self) -> Arc<ServerCapabilities> {
203 self.capabilities.load_full()
204 }
205
206 pub fn set_capabilities(&self, capabilities: ServerCapabilities) {
208 self.capabilities.store(Arc::new(capabilities));
209 }
210
211 pub fn instructions(&self) -> Option<String> {
213 (**self.instructions.load()).clone()
214 }
215
216 pub fn set_instructions(&self, instructions: Option<String>) {
218 self.instructions.store(Arc::new(instructions));
219 }
220
221 pub fn set_environment(&mut self, environment: Arc<dyn Environment>) {
223 self.environment = Some(environment);
224 }
225
226 fn visibility_context<'a>(&'a self, session: &'a Session) -> VisibilityContext<'a> {
227 match self.environment.as_deref() {
228 Some(env) => VisibilityContext::with_environment(session, env),
229 None => VisibilityContext::new(session),
230 }
231 }
232
233 pub fn add_middleware(&mut self, middleware: crate::server::middleware::MiddlewareFn) {
235 self.middleware.add(middleware);
236 }
237
238 pub fn tool_registry(&self) -> &ToolRegistry {
240 &self.tool_registry
241 }
242
243 pub fn resource_manager(&self) -> &ResourceManager {
245 &self.resource_manager
246 }
247
248 pub fn prompt_manager(&self) -> &PromptManager {
250 &self.prompt_manager
251 }
252
253 pub fn subscription_manager(&self) -> &SubscriptionManager {
255 &self.subscription_manager
256 }
257
258 pub fn completion_manager(&self) -> &CompletionManager {
260 &self.completion_manager
261 }
262
263 pub fn logger(&self) -> &crate::logging::McpLogger {
265 &self.logger
266 }
267
268 pub fn notification_sender(&self) -> mpsc::UnboundedSender<JsonRpcNotification> {
270 self.notification_tx.clone()
271 }
272
273 pub fn send_notification(
275 &self,
276 method: impl Into<String>,
277 params: Option<Value>,
278 ) -> Result<(), Box<dyn std::error::Error>> {
279 let notification = JsonRpcNotification::new(method, params);
280 self.notification_tx.send(notification)?;
281 Ok(())
282 }
283
284 pub fn get_session(&self, session_id: &str) -> Option<Session> {
286 self.sessions.get(session_id).map(|s| s.clone())
287 }
288
289 pub fn remove_session(&self, session_id: &str) -> Option<Session> {
293 self.subscription_manager.remove_session(session_id);
295 self.sessions.remove(session_id).map(|(_, s)| s)
296 }
297
298 pub fn notify_resource_updated(&self, uri: &str) {
310 self.subscription_manager.notify_resource_updated(uri);
311 }
312
313 pub fn notify_resources_updated(&self, uris: &[&str]) {
317 self.subscription_manager.notify_resources_updated(uris);
318 }
319
320 pub fn multiplexer(&self) -> Arc<RequestMultiplexer> {
322 self.multiplexer.clone()
323 }
324
325 pub fn cancellation_manager(&self) -> &CancellationManager {
329 &self.cancellation_manager
330 }
331
332 pub fn create_client_requester(&self, session_id: &str) -> Option<ClientRequester> {
337 let session = self.get_session(session_id)?;
338 let caps = session.capabilities.as_ref()?;
339
340 Some(ClientRequester::new(
341 self.request_tx.clone(),
342 self.multiplexer.clone(),
343 caps.roots.is_some(),
344 caps.sampling.is_some(),
345 caps.elicitation.is_some(),
346 ))
347 }
348
349 pub async fn request_roots(
375 &self,
376 session_id: &str,
377 timeout: Option<Duration>,
378 ) -> Result<Vec<Root>, MultiplexerError> {
379 if let Some(session) = self.get_session(session_id) {
381 if let Some(caps) = &session.capabilities {
382 if caps.roots.is_none() {
383 return Err(MultiplexerError::UnsupportedCapability("roots".to_string()));
384 }
385 } else {
386 return Err(MultiplexerError::UnsupportedCapability("roots".to_string()));
387 }
388 } else {
389 return Err(MultiplexerError::Transport("session not found".to_string()));
390 }
391
392 let (id, rx) = self.multiplexer.create_pending("roots/list");
394
395 let request = JsonRpcClientRequest::new(&id, "roots/list", Some(serde_json::json!({})));
397
398 self.request_tx
399 .send(request)
400 .map_err(|e| MultiplexerError::Transport(e.to_string()))?;
401
402 let timeout = timeout.unwrap_or(self.multiplexer.default_timeout());
404 let result = tokio::time::timeout(timeout, rx)
405 .await
406 .map_err(|_| MultiplexerError::Timeout(timeout))?
407 .map_err(|_| MultiplexerError::ChannelClosed)??;
408
409 let list_result: ListRootsResult = serde_json::from_value(result)?;
411 Ok(list_result.roots)
412 }
413
414 pub async fn request_sampling(
450 &self,
451 session_id: &str,
452 params: CreateMessageParams,
453 timeout: Option<Duration>,
454 ) -> Result<CreateMessageResult, MultiplexerError> {
455 if let Some(session) = self.get_session(session_id) {
457 if let Some(caps) = &session.capabilities {
458 if caps.sampling.is_none() {
459 return Err(MultiplexerError::UnsupportedCapability(
460 "sampling".to_string(),
461 ));
462 }
463 } else {
464 return Err(MultiplexerError::UnsupportedCapability(
465 "sampling".to_string(),
466 ));
467 }
468 } else {
469 return Err(MultiplexerError::Transport("session not found".to_string()));
470 }
471
472 let (id, rx) = self.multiplexer.create_pending("sampling/createMessage");
474
475 let params_value = serde_json::to_value(¶ms)?;
477 let request = JsonRpcClientRequest::new(&id, "sampling/createMessage", Some(params_value));
478
479 self.request_tx
480 .send(request)
481 .map_err(|e| MultiplexerError::Transport(e.to_string()))?;
482
483 let timeout = timeout.unwrap_or(self.multiplexer.default_timeout());
485 let result = tokio::time::timeout(timeout, rx)
486 .await
487 .map_err(|_| MultiplexerError::Timeout(timeout))?
488 .map_err(|_| MultiplexerError::ChannelClosed)??;
489
490 let create_result: CreateMessageResult = serde_json::from_value(result)?;
492 Ok(create_result)
493 }
494
495 pub async fn request_elicitation(
546 &self,
547 session_id: &str,
548 message: impl Into<String>,
549 requested_schema: Value,
550 timeout: Option<Duration>,
551 ) -> Result<crate::protocol::types::CreateElicitationResult, MultiplexerError> {
552 if let Some(session) = self.get_session(session_id) {
554 if let Some(caps) = &session.capabilities {
555 if caps.elicitation.is_none() {
556 return Err(MultiplexerError::UnsupportedCapability(
557 "elicitation".to_string(),
558 ));
559 }
560 } else {
561 return Err(MultiplexerError::UnsupportedCapability(
562 "elicitation".to_string(),
563 ));
564 }
565 } else {
566 return Err(MultiplexerError::Transport("session not found".to_string()));
567 }
568
569 let (id, rx) = self.multiplexer.create_pending("elicitation/create");
571
572 let params = serde_json::json!({
574 "message": message.into(),
575 "requestedSchema": requested_schema,
576 });
577 let request = JsonRpcClientRequest::new(&id, "elicitation/create", Some(params));
578
579 self.request_tx
580 .send(request)
581 .map_err(|e| MultiplexerError::Transport(e.to_string()))?;
582
583 let timeout = timeout.unwrap_or(self.multiplexer.default_timeout());
585 let result = tokio::time::timeout(timeout, rx)
586 .await
587 .map_err(|_| MultiplexerError::Timeout(timeout))?
588 .map_err(|_| MultiplexerError::ChannelClosed)??;
589
590 let elicitation_result: crate::protocol::types::CreateElicitationResult =
592 serde_json::from_value(result)?;
593 Ok(elicitation_result)
594 }
595
596 pub async fn run<T: Transport>(
605 &self,
606 mut transport: T,
607 ) -> Result<(), Box<dyn std::error::Error>> {
608 let session_id = uuid::Uuid::new_v4().to_string();
610
611 let mut notification_rx = self.notification_rx.lock().await;
613 let mut request_rx = self.request_rx.lock().await;
614
615 loop {
616 tokio::select! {
617 Some(notification) = notification_rx.recv() => {
619 tracing::debug!(method = %notification.method, "Sending notification");
620 if let Err(e) = transport.send_notification(notification).await {
621 tracing::error!(error = %e, "Failed to send notification");
622 }
623 }
624
625 Some(request) = request_rx.recv() => {
627 tracing::debug!(method = %request.method, id = %request.id, "Sending request to client");
628 if let Err(e) = transport.send_request(request).await {
629 tracing::error!(error = %e, "Failed to send request to client");
630 }
631 }
632
633 result = transport.read_incoming() => {
635 match result {
636 Ok(IncomingMessage::Request(request)) => {
637 let is_notification = request.id.is_none();
639
640 let response = self.handle_request(&session_id, request).await;
642
643 if !is_notification
646 && let Err(e) = transport.write_message(response).await {
647 tracing::error!(error = %e, "Failed to write message");
648 break;
649 }
650 }
651 Ok(IncomingMessage::Response(response)) => {
652 if !self.multiplexer.route_response(&response) {
654 tracing::warn!(
655 id = ?response.id,
656 "Received response for unknown request ID"
657 );
658 }
659 }
660 Err(crate::transport::traits::TransportError::Closed) => {
661 tracing::info!("Transport closed, shutting down");
662 break;
663 }
664 Err(e) => {
665 tracing::error!(error = %e, "Failed to read message");
666 continue;
667 }
668 }
669 }
670 }
671 }
672
673 self.multiplexer.cancel_all();
675
676 self.cancellation_manager.cancel_all();
678
679 transport.shutdown().await?;
681
682 Ok(())
683 }
684
685 pub async fn handle_request(
687 &self,
688 session_id: &str,
689 request: JsonRpcRequest,
690 ) -> JsonRpcResponse {
691 let session = self
693 .sessions
694 .entry(session_id.to_string())
695 .or_insert_with(|| {
696 let mut session = Session::with_id(session_id);
697 session.set_notification_channel(self.notification_tx.clone());
698 session
699 })
700 .clone();
701
702 let ctx = RequestContext::new(session, request.clone());
704
705 let ctx = match self.middleware.process(ctx) {
707 Ok(ctx) => ctx,
708 Err(err) => return error_response(request.id, err.to_jsonrpc()),
709 };
710
711 let method = McpMethod::from(request.method.clone());
713
714 match method {
715 McpMethod::Initialize => self.handle_initialize(ctx).await,
716 McpMethod::Ping => self.handle_ping(ctx).await,
717 McpMethod::LoggingSetLevel => self.handle_logging_set_level(ctx).await,
718 McpMethod::ToolsList => self.handle_tools_list(ctx).await,
719 McpMethod::ToolsCall => self.handle_tools_call(ctx).await,
720 McpMethod::ResourcesList => self.handle_resources_list(ctx).await,
721 McpMethod::ResourcesTemplatesList => self.handle_resources_templates_list(ctx).await,
722 McpMethod::ResourcesRead => self.handle_resources_read(ctx).await,
723 McpMethod::ResourcesSubscribe => self.handle_resources_subscribe(ctx).await,
724 McpMethod::ResourcesUnsubscribe => self.handle_resources_unsubscribe(ctx).await,
725 McpMethod::PromptsList => self.handle_prompts_list(ctx).await,
726 McpMethod::PromptsGet => self.handle_prompts_get(ctx).await,
727 McpMethod::RootsList => self.handle_roots_list(ctx).await,
728 McpMethod::SamplingCreateMessage => self.handle_sampling_create_message(ctx).await,
729 McpMethod::ElicitationCreate => self.handle_elicitation_create(ctx).await,
730 McpMethod::TasksGet => self.handle_tasks_get(ctx).await,
731 McpMethod::TasksResult => self.handle_tasks_result(ctx).await,
732 McpMethod::TasksList => self.handle_tasks_list(ctx).await,
733 McpMethod::TasksCancel => self.handle_tasks_cancel(ctx).await,
734 McpMethod::CompletionComplete => self.handle_completion_complete(ctx).await,
735 McpMethod::NotificationsCancelled => self.handle_notifications_cancelled(ctx).await,
736 _ => error_response(
737 request.id,
738 McpError::method_not_found(&request.method).to_jsonrpc(),
739 ),
740 }
741 }
742
743 async fn handle_initialize(&self, ctx: RequestContext) -> JsonRpcResponse {
745 let params = ctx.params().cloned().unwrap_or(Value::Null);
747 let req: InitializeRequest = match serde_json::from_value(params) {
748 Ok(req) => req,
749 Err(_) => {
750 return error_response(
751 ctx.request.id,
752 McpError::validation("invalid_params", "Invalid initialize parameters")
753 .to_jsonrpc(),
754 );
755 }
756 };
757
758 let protocol_version = match version::negotiate_protocol_version(&req.protocol_version) {
760 Ok(version) => version,
761 Err(supported_versions) => {
762 tracing::warn!(
763 client = %req.client_info.name,
764 requested = %req.protocol_version,
765 supported = ?supported_versions,
766 "Unsupported protocol version"
767 );
768 return error_response(
769 ctx.request.id,
770 McpError::builder(ErrorType::Validation, "unsupported_protocol_version")
771 .message("Unsupported protocol version")
772 .detail(
773 "supported",
774 serde_json::to_value(&supported_versions).unwrap(),
775 )
776 .detail("requested", req.protocol_version.clone())
777 .build()
778 .to_jsonrpc(),
779 );
780 }
781 };
782
783 tracing::info!(
785 client = %req.client_info.name,
786 version = %req.client_info.version,
787 protocol = %protocol_version,
788 "Client connected"
789 );
790
791 if let Some(mut session) = self.sessions.get_mut(&ctx.session.id) {
793 session.initialize(req.client_info, req.capabilities, protocol_version.clone());
794 }
795
796 let result = InitializeResult {
798 protocol_version,
799 capabilities: (**self.capabilities.load()).clone(),
800 server_info: Implementation {
801 name: self.name.clone(),
802 version: self.version.clone(),
803 },
804 instructions: self.instructions(),
805 };
806
807 success_response(
808 ctx.request.id,
809 serde_json::to_value(result).expect("Failed to serialize InitializeResult"),
810 )
811 }
812
813 async fn handle_ping(&self, ctx: RequestContext) -> JsonRpcResponse {
815 success_response(ctx.request.id, serde_json::json!({}))
816 }
817
818 async fn handle_logging_set_level(&self, ctx: RequestContext) -> JsonRpcResponse {
820 use crate::logging::LogLevel;
821 use crate::protocol::types::SetLevelRequest;
822
823 let params = ctx.params().cloned().unwrap_or(Value::Null);
825 let req: SetLevelRequest = match serde_json::from_value(params) {
826 Ok(req) => req,
827 Err(_) => {
828 return error_response(
829 ctx.request.id,
830 McpError::validation("invalid_params", "Invalid setLevel parameters")
831 .to_jsonrpc(),
832 );
833 }
834 };
835
836 let level = match req.level.parse::<LogLevel>() {
838 Ok(level) => level,
839 Err(_) => {
840 return error_response(
841 ctx.request.id,
842 McpError::validation(
843 "invalid_level",
844 format!(
845 "Invalid log level '{}'. Valid levels: debug, info, notice, warning, error, critical, alert, emergency",
846 req.level
847 ),
848 )
849 .to_jsonrpc(),
850 )
851 }
852 };
853
854 self.logger.set_min_level(level);
856
857 tracing::debug!(level = %req.level, "Log level updated");
858
859 success_response(ctx.request.id, serde_json::json!({}))
860 }
861
862 async fn handle_tools_list(&self, ctx: RequestContext) -> JsonRpcResponse {
864 if let Err(err) = require_initialization(&ctx) {
865 return error_response(ctx.request.id, err.to_jsonrpc());
866 }
867
868 let cursor = ctx
870 .params()
871 .and_then(|p| p.get("cursor"))
872 .and_then(|c| c.as_str());
873
874 let visibility_ctx = self.visibility_context(&ctx.session);
875 let all_tools = self
876 .tool_registry
877 .list_for_session(&ctx.session, &visibility_ctx);
878
879 let result = crate::utils::paginate(&all_tools, cursor, crate::utils::DEFAULT_PAGE_SIZE);
881
882 let mut response = serde_json::json!({"tools": result.items});
884 if let Some(next) = result.next_cursor {
885 response["nextCursor"] = serde_json::Value::String(next);
886 }
887 success_response(ctx.request.id, response)
888 }
889
890 async fn handle_tools_call(&self, ctx: RequestContext) -> JsonRpcResponse {
892 if let Err(err) = require_initialization(&ctx) {
893 return error_response(ctx.request.id, err.to_jsonrpc());
894 }
895
896 let params = ctx.params().cloned().unwrap_or(Value::Null);
898 let tool_name = match params.get("name").and_then(|v| v.as_str()) {
899 Some(name) => name,
900 None => {
901 return error_response(
902 ctx.request.id,
903 McpError::validation("invalid_params", "Missing 'name' field").to_jsonrpc(),
904 );
905 }
906 };
907
908 let tool_params = params.get("arguments").cloned().unwrap_or(Value::Null);
909
910 let task_meta: Option<crate::protocol::types::TaskMetadata> = params
912 .get("task")
913 .and_then(|t| serde_json::from_value(t.clone()).ok());
914
915 if let Some(task_metadata) = task_meta {
916 return self
918 .handle_task_augmented_tool_call(ctx, tool_name, tool_params, task_metadata)
919 .await;
920 }
921
922 let client_requester = self.create_client_requester(&ctx.session.id);
924 let visibility_ctx = self.visibility_context(&ctx.session);
925
926 match self
927 .tool_registry
928 .call_for_session(
929 tool_name,
930 tool_params,
931 &ctx.session,
932 &self.logger,
933 &visibility_ctx,
934 client_requester,
935 )
936 .await
937 {
938 Ok(output) => {
939 let result = match output {
940 crate::registry::tools::ToolOutput::Content(items) => {
941 let content_values: Vec<Value> =
942 items.iter().map(|c| c.to_value()).collect();
943 serde_json::json!({"content": content_values})
944 }
945 crate::registry::tools::ToolOutput::Structured(value) => {
946 serde_json::json!({"structuredContent": value})
947 }
948 };
949 success_response(ctx.request.id, result)
950 }
951 Err(e) => error_response(ctx.request.id, tool_error_to_mcp_error(tool_name, e).to_jsonrpc()),
952 }
953 }
954
955 async fn handle_task_augmented_tool_call(
957 &self,
958 ctx: RequestContext,
959 tool_name: &str,
960 tool_params: Value,
961 task_metadata: crate::protocol::types::TaskMetadata,
962 ) -> JsonRpcResponse {
963 let (task, _result_rx) =
965 self.task_store
966 .create_task(&ctx.session.id, ctx.request.clone(), task_metadata.ttl);
967
968 let task_id = task.task_id.clone();
969
970 let task_store = self.task_store.clone();
972 let tool_registry = self.tool_registry.clone();
973 let logger = self.logger.clone();
974 let session = ctx.session.clone();
975 let environment = self.environment.clone();
976 let client_requester = self.create_client_requester(&ctx.session.id);
977 let tool_name = tool_name.to_string();
978
979 tokio::spawn(async move {
980 let visibility_ctx = match environment.as_deref() {
982 Some(env) => VisibilityContext::with_environment(&session, env),
983 None => VisibilityContext::new(&session),
984 };
985
986 match tool_registry
987 .call_for_session(
988 &tool_name,
989 tool_params,
990 &session,
991 &logger,
992 &visibility_ctx,
993 client_requester,
994 )
995 .await
996 {
997 Ok(output) => {
998 let result = match output {
1000 crate::registry::tools::ToolOutput::Content(items) => {
1001 let content_values: Vec<Value> =
1002 items.iter().map(|c| c.to_value()).collect();
1003 serde_json::json!({"content": content_values})
1004 }
1005 crate::registry::tools::ToolOutput::Structured(value) => {
1006 serde_json::json!({"structuredContent": value})
1007 }
1008 };
1009
1010 let _ = task_store
1011 .update_status(
1012 &task_id,
1013 crate::protocol::types::TaskStatus::Completed,
1014 None,
1015 )
1016 .await;
1017 let _ = task_store.store_result(&task_id, result).await;
1018 }
1019 Err(e) => {
1020 let error_message = e.to_string();
1022 let _ = task_store
1023 .update_status(
1024 &task_id,
1025 crate::protocol::types::TaskStatus::Failed,
1026 Some(error_message.clone()),
1027 )
1028 .await;
1029
1030 let error_result = serde_json::json!({
1032 "content": [{
1033 "type": "text",
1034 "text": error_message
1035 }],
1036 "isError": true
1037 });
1038 let _ = task_store.store_result(&task_id, error_result).await;
1039 }
1040 }
1041 });
1042
1043 success_response(
1045 ctx.request.id,
1046 serde_json::to_value(crate::protocol::types::CreateTaskResult { task }).unwrap(),
1047 )
1048 }
1049
1050 async fn handle_resources_list(&self, ctx: RequestContext) -> JsonRpcResponse {
1052 if let Err(err) = require_initialization(&ctx) {
1053 return error_response(ctx.request.id, err.to_jsonrpc());
1054 }
1055
1056 let cursor = ctx
1058 .params()
1059 .and_then(|p| p.get("cursor"))
1060 .and_then(|c| c.as_str());
1061
1062 let visibility_ctx = self.visibility_context(&ctx.session);
1063 let all_resources = self
1064 .resource_manager
1065 .list_for_session(&ctx.session, &visibility_ctx);
1066
1067 let result = crate::utils::paginate(&all_resources, cursor, crate::utils::DEFAULT_PAGE_SIZE);
1069
1070 let mut response = serde_json::json!({"resources": result.items});
1072 if let Some(next) = result.next_cursor {
1073 response["nextCursor"] = serde_json::Value::String(next);
1074 }
1075 success_response(ctx.request.id, response)
1076 }
1077
1078 async fn handle_resources_templates_list(&self, ctx: RequestContext) -> JsonRpcResponse {
1080 if let Err(err) = require_initialization(&ctx) {
1081 return error_response(ctx.request.id, err.to_jsonrpc());
1082 }
1083
1084 let cursor = ctx
1086 .params()
1087 .and_then(|p| p.get("cursor"))
1088 .and_then(|c| c.as_str());
1089
1090 let visibility_ctx = self.visibility_context(&ctx.session);
1091 let all_templates = self
1092 .resource_manager
1093 .list_templates_for_session(&ctx.session, &visibility_ctx);
1094
1095 let result = crate::utils::paginate(&all_templates, cursor, crate::utils::DEFAULT_PAGE_SIZE);
1097
1098 let mut response = serde_json::json!({"resourceTemplates": result.items});
1100 if let Some(next) = result.next_cursor {
1101 response["nextCursor"] = serde_json::Value::String(next);
1102 }
1103 success_response(ctx.request.id, response)
1104 }
1105
1106 async fn handle_resources_read(&self, ctx: RequestContext) -> JsonRpcResponse {
1108 if let Err(err) = require_initialization(&ctx) {
1109 return error_response(ctx.request.id, err.to_jsonrpc());
1110 }
1111
1112 let params = ctx.params().cloned().unwrap_or(Value::Null);
1114 let uri = match params.get("uri").and_then(|v| v.as_str()) {
1115 Some(uri) => uri,
1116 None => {
1117 return error_response(
1118 ctx.request.id,
1119 McpError::validation("invalid_params", "Missing 'uri' field").to_jsonrpc(),
1120 );
1121 }
1122 };
1123
1124 match self
1126 .resource_manager
1127 .read(
1128 uri,
1129 std::collections::HashMap::new(),
1130 &ctx.session,
1131 &self.logger,
1132 )
1133 .await
1134 {
1135 Ok(contents) => {
1136 let content_values: Vec<Value> = contents.iter().map(|c| c.to_value()).collect();
1138 success_response(
1139 ctx.request.id,
1140 serde_json::json!({"contents": content_values}),
1141 )
1142 }
1143 Err(e) => error_response(
1144 ctx.request.id,
1145 McpError::internal("resource_read_failed", e.to_string()).to_jsonrpc(),
1146 ),
1147 }
1148 }
1149
1150 async fn handle_resources_subscribe(&self, ctx: RequestContext) -> JsonRpcResponse {
1156 if let Err(err) = require_initialization(&ctx) {
1157 return error_response(ctx.request.id, err.to_jsonrpc());
1158 }
1159
1160 let caps = self.capabilities.load();
1162 if caps.resources.is_none()
1163 || caps.resources.as_ref().and_then(|r| r.subscribe) != Some(true)
1164 {
1165 return error_response(
1166 ctx.request.id,
1167 McpError::validation(
1168 "capability_not_supported",
1169 "Resource subscriptions are not enabled on this server",
1170 )
1171 .to_jsonrpc(),
1172 );
1173 }
1174
1175 let params = ctx.params().cloned().unwrap_or(Value::Null);
1177 let req: SubscribeRequest = match serde_json::from_value(params) {
1178 Ok(req) => req,
1179 Err(_) => {
1180 return error_response(
1181 ctx.request.id,
1182 McpError::validation("invalid_params", "Missing or invalid 'uri' field")
1183 .to_jsonrpc(),
1184 );
1185 }
1186 };
1187
1188 self.subscription_manager.subscribe(&ctx.session.id, &req.uri);
1190
1191 tracing::debug!(
1192 session_id = %ctx.session.id,
1193 uri = %req.uri,
1194 "Session subscribed to resource"
1195 );
1196
1197 success_response(ctx.request.id, serde_json::json!({}))
1198 }
1199
1200 async fn handle_resources_unsubscribe(&self, ctx: RequestContext) -> JsonRpcResponse {
1204 if let Err(err) = require_initialization(&ctx) {
1205 return error_response(ctx.request.id, err.to_jsonrpc());
1206 }
1207
1208 let params = ctx.params().cloned().unwrap_or(Value::Null);
1210 let req: UnsubscribeRequest = match serde_json::from_value(params) {
1211 Ok(req) => req,
1212 Err(_) => {
1213 return error_response(
1214 ctx.request.id,
1215 McpError::validation("invalid_params", "Missing or invalid 'uri' field")
1216 .to_jsonrpc(),
1217 );
1218 }
1219 };
1220
1221 self.subscription_manager
1223 .unsubscribe(&ctx.session.id, &req.uri);
1224
1225 tracing::debug!(
1226 session_id = %ctx.session.id,
1227 uri = %req.uri,
1228 "Session unsubscribed from resource"
1229 );
1230
1231 success_response(ctx.request.id, serde_json::json!({}))
1232 }
1233
1234 async fn handle_prompts_list(&self, ctx: RequestContext) -> JsonRpcResponse {
1236 if let Err(err) = require_initialization(&ctx) {
1237 return error_response(ctx.request.id, err.to_jsonrpc());
1238 }
1239
1240 let cursor = ctx
1242 .params()
1243 .and_then(|p| p.get("cursor"))
1244 .and_then(|c| c.as_str());
1245
1246 let visibility_ctx = self.visibility_context(&ctx.session);
1247 let all_prompts = self
1248 .prompt_manager
1249 .list_for_session(&ctx.session, &visibility_ctx);
1250
1251 let result = crate::utils::paginate(&all_prompts, cursor, crate::utils::DEFAULT_PAGE_SIZE);
1253
1254 let mut response = serde_json::json!({"prompts": result.items});
1256 if let Some(next) = result.next_cursor {
1257 response["nextCursor"] = serde_json::Value::String(next);
1258 }
1259 success_response(ctx.request.id, response)
1260 }
1261
1262 async fn handle_prompts_get(&self, ctx: RequestContext) -> JsonRpcResponse {
1264 if let Err(err) = require_initialization(&ctx) {
1265 return error_response(ctx.request.id, err.to_jsonrpc());
1266 }
1267
1268 let params = ctx.params().cloned().unwrap_or(Value::Null);
1270 let prompt_name = match params.get("name").and_then(|v| v.as_str()) {
1271 Some(name) => name,
1272 None => {
1273 return error_response(
1274 ctx.request.id,
1275 McpError::validation("invalid_params", "Missing 'name' field").to_jsonrpc(),
1276 );
1277 }
1278 };
1279
1280 let prompt_params = params.get("arguments").cloned().unwrap_or(Value::Null);
1281
1282 match self
1284 .prompt_manager
1285 .call(prompt_name, prompt_params, &ctx.session, &self.logger)
1286 .await
1287 {
1288 Ok(result) => success_response(
1289 ctx.request.id,
1290 serde_json::to_value(result).expect("Failed to serialize prompt result"),
1291 ),
1292 Err(e) => error_response(
1293 ctx.request.id,
1294 McpError::internal("prompt_get_failed", e.to_string()).to_jsonrpc(),
1295 ),
1296 }
1297 }
1298
1299 async fn handle_roots_list(&self, ctx: RequestContext) -> JsonRpcResponse {
1305 if let Err(err) = require_initialization(&ctx) {
1306 return error_response(ctx.request.id, err.to_jsonrpc());
1307 }
1308
1309 use crate::protocol::types::ListRootsResult;
1313
1314 let result = ListRootsResult { roots: vec![] };
1315
1316 success_response(
1317 ctx.request.id,
1318 serde_json::to_value(result).expect("Failed to serialize roots list"),
1319 )
1320 }
1321
1322 async fn handle_sampling_create_message(&self, ctx: RequestContext) -> JsonRpcResponse {
1328 if let Err(err) = require_initialization(&ctx) {
1329 return error_response(ctx.request.id, err.to_jsonrpc());
1330 }
1331
1332 error_response(
1335 ctx.request.id,
1336 McpError::not_implemented(
1337 "sampling/createMessage is a client capability. Use ClientRequester.create_message() for server→client requests."
1338 ).to_jsonrpc(),
1339 )
1340 }
1341
1342 async fn handle_elicitation_create(&self, ctx: RequestContext) -> JsonRpcResponse {
1347 if let Err(err) = require_initialization(&ctx) {
1348 return error_response(ctx.request.id, err.to_jsonrpc());
1349 }
1350
1351 error_response(
1354 ctx.request.id,
1355 McpError::not_implemented(
1356 "elicitation/create is a client capability. Use ClientRequester.create_elicitation() for server→client requests."
1357 ).to_jsonrpc(),
1358 )
1359 }
1360
1361 async fn handle_tasks_get(&self, ctx: RequestContext) -> JsonRpcResponse {
1363 if let Err(err) = require_initialization(&ctx) {
1364 return error_response(ctx.request.id, err.to_jsonrpc());
1365 }
1366
1367 let params: crate::protocol::types::GetTaskParams = match ctx.params() {
1368 Some(p) => match serde_json::from_value(p.clone()) {
1369 Ok(params) => params,
1370 Err(_) => {
1371 return error_response(
1372 ctx.request.id,
1373 McpError::validation("invalid_params", "Missing or invalid taskId")
1374 .to_jsonrpc(),
1375 );
1376 }
1377 },
1378 None => {
1379 return error_response(
1380 ctx.request.id,
1381 McpError::validation("invalid_params", "Missing taskId parameter").to_jsonrpc(),
1382 );
1383 }
1384 };
1385
1386 match self
1387 .task_store
1388 .get_task_for_session(¶ms.task_id, &ctx.session.id)
1389 .await
1390 {
1391 Some(task) => success_response(ctx.request.id, serde_json::to_value(task).unwrap()),
1392 None => error_response(
1393 ctx.request.id,
1394 McpError::validation("invalid_params", "Task not found").to_jsonrpc(),
1395 ),
1396 }
1397 }
1398
1399 async fn handle_tasks_result(&self, ctx: RequestContext) -> JsonRpcResponse {
1401 if let Err(err) = require_initialization(&ctx) {
1402 return error_response(ctx.request.id, err.to_jsonrpc());
1403 }
1404
1405 let params: crate::protocol::types::GetTaskParams = match ctx.params() {
1406 Some(p) => match serde_json::from_value(p.clone()) {
1407 Ok(params) => params,
1408 Err(_) => {
1409 return error_response(
1410 ctx.request.id,
1411 McpError::validation("invalid_params", "Missing or invalid taskId")
1412 .to_jsonrpc(),
1413 );
1414 }
1415 },
1416 None => {
1417 return error_response(
1418 ctx.request.id,
1419 McpError::validation("invalid_params", "Missing taskId parameter").to_jsonrpc(),
1420 );
1421 }
1422 };
1423
1424 if self
1426 .task_store
1427 .get_task_for_session(¶ms.task_id, &ctx.session.id)
1428 .await
1429 .is_none()
1430 {
1431 return error_response(
1432 ctx.request.id,
1433 McpError::validation("invalid_params", "Task not found").to_jsonrpc(),
1434 );
1435 }
1436
1437 match self
1439 .task_store
1440 .wait_for_result(¶ms.task_id, std::time::Duration::from_secs(300))
1441 .await
1442 {
1443 Ok(result) => success_response(ctx.request.id, result),
1444 Err(e) => error_response(
1445 ctx.request.id,
1446 McpError::internal("task_error", e.to_string()).to_jsonrpc(),
1447 ),
1448 }
1449 }
1450
1451 async fn handle_tasks_list(&self, ctx: RequestContext) -> JsonRpcResponse {
1453 if let Err(err) = require_initialization(&ctx) {
1454 return error_response(ctx.request.id, err.to_jsonrpc());
1455 }
1456
1457 let cursor = ctx
1459 .params()
1460 .and_then(|p| p.get("cursor"))
1461 .and_then(|c| c.as_str());
1462
1463 let (tasks, next_cursor) = self
1464 .task_store
1465 .list_tasks(&ctx.session.id, cursor, 100)
1466 .await;
1467
1468 success_response(
1469 ctx.request.id,
1470 serde_json::json!({
1471 "tasks": tasks,
1472 "nextCursor": next_cursor,
1473 }),
1474 )
1475 }
1476
1477 async fn handle_tasks_cancel(&self, ctx: RequestContext) -> JsonRpcResponse {
1479 if let Err(err) = require_initialization(&ctx) {
1480 return error_response(ctx.request.id, err.to_jsonrpc());
1481 }
1482
1483 let params: crate::protocol::types::CancelTaskParams = match ctx.params() {
1484 Some(p) => match serde_json::from_value(p.clone()) {
1485 Ok(params) => params,
1486 Err(_) => {
1487 return error_response(
1488 ctx.request.id,
1489 McpError::validation("invalid_params", "Missing or invalid taskId")
1490 .to_jsonrpc(),
1491 );
1492 }
1493 },
1494 None => {
1495 return error_response(
1496 ctx.request.id,
1497 McpError::validation("invalid_params", "Missing taskId parameter").to_jsonrpc(),
1498 );
1499 }
1500 };
1501
1502 match self
1503 .task_store
1504 .cancel_task(¶ms.task_id, &ctx.session.id)
1505 .await
1506 {
1507 Ok(task) => success_response(ctx.request.id, serde_json::to_value(task).unwrap()),
1508 Err(e) => {
1509 let error_msg = match e {
1510 crate::managers::task::TaskError::NotFound(_) => {
1511 McpError::validation("invalid_params", "Task not found")
1512 }
1513 crate::managers::task::TaskError::AlreadyTerminal(status) => {
1514 McpError::validation(
1515 "invalid_params",
1516 format!(
1517 "Cannot cancel task: already in terminal status '{:?}'",
1518 status
1519 ),
1520 )
1521 }
1522 _ => McpError::internal("task_error", e.to_string()),
1523 };
1524 error_response(ctx.request.id, error_msg.to_jsonrpc())
1525 }
1526 }
1527 }
1528
1529 async fn handle_completion_complete(&self, ctx: RequestContext) -> JsonRpcResponse {
1534 if let Err(err) = require_initialization(&ctx) {
1535 return error_response(ctx.request.id, err.to_jsonrpc());
1536 }
1537
1538 let caps = self.capabilities.load();
1540 if caps.completion.is_none() {
1541 return error_response(
1542 ctx.request.id,
1543 McpError::not_implemented("Completion capability not enabled").to_jsonrpc(),
1544 );
1545 }
1546
1547 let params = ctx.params().cloned().unwrap_or(Value::Null);
1549 let request: crate::protocol::types::CompleteRequest = match serde_json::from_value(params)
1550 {
1551 Ok(req) => req,
1552 Err(e) => {
1553 return error_response(
1554 ctx.request.id,
1555 McpError::validation(
1556 "invalid_params",
1557 format!("Invalid completion request: {}", e),
1558 )
1559 .to_jsonrpc(),
1560 );
1561 }
1562 };
1563
1564 match self.completion_manager.complete(request).await {
1566 Ok(result) => success_response(
1567 ctx.request.id,
1568 serde_json::to_value(result).expect("Failed to serialize completion result"),
1569 ),
1570 Err(e) => error_response(
1571 ctx.request.id,
1572 McpError::internal("completion_failed", e.to_string()).to_jsonrpc(),
1573 ),
1574 }
1575 }
1576
1577 async fn handle_notifications_cancelled(&self, ctx: RequestContext) -> JsonRpcResponse {
1583 let params: CancelledNotificationParams = match ctx.params() {
1585 Some(p) => match serde_json::from_value(p.clone()) {
1586 Ok(params) => params,
1587 Err(e) => {
1588 tracing::warn!(error = %e, "Invalid notifications/cancelled params");
1589 return success_response(ctx.request.id, serde_json::json!({}));
1591 }
1592 },
1593 None => {
1594 tracing::warn!("notifications/cancelled missing params");
1595 return success_response(ctx.request.id, serde_json::json!({}));
1596 }
1597 };
1598
1599 let request_id = params.request_id_string();
1600 tracing::debug!(
1601 request_id = %request_id,
1602 reason = ?params.reason,
1603 "Received cancellation request"
1604 );
1605
1606 self.cancellation_manager.cancel(&request_id, params.reason);
1608
1609 success_response(ctx.request.id, serde_json::json!({}))
1612 }
1613}
1614
1615#[cfg(test)]
1616mod tests {
1617 use super::*;
1618
1619 #[tokio::test]
1620 async fn test_server_creation() {
1621 let server = Server::new("test-server", "1.0.0");
1622 assert_eq!(server.name(), "test-server");
1623 assert_eq!(server.version(), "1.0.0");
1624 }
1625
1626 #[tokio::test]
1627 async fn test_ping() {
1628 let server = Server::new("test-server", "1.0.0");
1629
1630 let request = JsonRpcRequest {
1631 jsonrpc: "2.0".to_string(),
1632 id: Some(Value::Number(1.into())),
1633 method: "ping".to_string(),
1634 params: None,
1635 };
1636
1637 let response = server.handle_request("test-session", request).await;
1638
1639 assert!(response.result.is_some());
1640 assert!(response.error.is_none());
1641 }
1642
1643 #[tokio::test]
1644 async fn test_initialize() {
1645 let server = Server::new("test-server", "1.0.0");
1646
1647 let request = JsonRpcRequest {
1648 jsonrpc: "2.0".to_string(),
1649 id: Some(Value::Number(1.into())),
1650 method: "initialize".to_string(),
1651 params: Some(serde_json::json!({
1652 "protocolVersion": "2025-11-25",
1653 "capabilities": {},
1654 "clientInfo": {
1655 "name": "test-client",
1656 "version": "1.0.0"
1657 }
1658 })),
1659 };
1660
1661 let response = server.handle_request("test-session", request).await;
1662
1663 assert!(response.result.is_some());
1664 assert!(response.error.is_none());
1665
1666 let session = server.get_session("test-session").unwrap();
1668 assert!(session.is_initialized());
1669 assert_eq!(session.client_info.unwrap().name, "test-client");
1670 }
1671
1672 #[tokio::test]
1673 async fn test_method_not_found() {
1674 let server = Server::new("test-server", "1.0.0");
1675
1676 let request = JsonRpcRequest {
1677 jsonrpc: "2.0".to_string(),
1678 id: Some(Value::Number(1.into())),
1679 method: "unknown/method".to_string(),
1680 params: None,
1681 };
1682
1683 let response = server.handle_request("test-session", request).await;
1684
1685 assert!(response.result.is_none());
1686 assert!(response.error.is_some());
1687 assert_eq!(response.error.unwrap().code, -32601);
1688 }
1689
1690 #[tokio::test]
1691 async fn test_requires_initialization() {
1692 let server = Server::new("test-server", "1.0.0");
1693
1694 let request = JsonRpcRequest {
1695 jsonrpc: "2.0".to_string(),
1696 id: Some(Value::Number(1.into())),
1697 method: "tools/list".to_string(),
1698 params: None,
1699 };
1700
1701 let response = server.handle_request("test-session", request.clone()).await;
1703 assert!(response.error.is_some());
1704
1705 let init_request = JsonRpcRequest {
1707 jsonrpc: "2.0".to_string(),
1708 id: Some(Value::Number(2.into())),
1709 method: "initialize".to_string(),
1710 params: Some(serde_json::json!({
1711 "protocolVersion": "2025-11-25",
1712 "capabilities": {},
1713 "clientInfo": {
1714 "name": "test-client",
1715 "version": "1.0.0"
1716 }
1717 })),
1718 };
1719 server.handle_request("test-session", init_request).await;
1720
1721 let response = server.handle_request("test-session", request).await;
1723 assert!(response.result.is_some());
1724 }
1725
1726 #[tokio::test]
1727 async fn test_session_management() {
1728 let server = Server::new("test-server", "1.0.0");
1729
1730 let request = JsonRpcRequest {
1732 jsonrpc: "2.0".to_string(),
1733 id: Some(Value::Number(1.into())),
1734 method: "ping".to_string(),
1735 params: None,
1736 };
1737 server.handle_request("session-1", request).await;
1738
1739 assert!(server.get_session("session-1").is_some());
1741
1742 let removed = server.remove_session("session-1");
1744 assert!(removed.is_some());
1745
1746 assert!(server.get_session("session-1").is_none());
1748 }
1749
1750 #[tokio::test]
1751 async fn test_capabilities_update() {
1752 let server = Server::new("test-server", "1.0.0");
1753
1754 let caps = ServerCapabilities {
1755 tools: Some(crate::protocol::capabilities::ToolsCapability {
1756 list_changed: Some(true),
1757 }),
1758 ..Default::default()
1759 };
1760
1761 server.set_capabilities(caps.clone());
1762
1763 let loaded_caps = server.capabilities();
1764 assert_eq!(loaded_caps.tools, caps.tools);
1765 }
1766
1767 async fn init_test_session(server: &Server, session_id: &str) {
1773 let request = JsonRpcRequest {
1774 jsonrpc: "2.0".to_string(),
1775 id: Some(Value::Number(1.into())),
1776 method: "initialize".to_string(),
1777 params: Some(serde_json::json!({
1778 "protocolVersion": "2025-11-25",
1779 "capabilities": {
1780 "tasks": {
1781 "list": {},
1782 "cancel": {},
1783 "requests": {
1784 "tools": {
1785 "call": {}
1786 }
1787 }
1788 }
1789 },
1790 "clientInfo": {
1791 "name": "test-client",
1792 "version": "1.0.0"
1793 }
1794 })),
1795 };
1796
1797 server.handle_request(session_id, request).await;
1798 }
1799
1800 struct TestTaskTool;
1802
1803 #[async_trait::async_trait]
1804 impl crate::registry::tools::Tool for TestTaskTool {
1805 fn name(&self) -> &str {
1806 "test_task"
1807 }
1808
1809 fn description(&self) -> Option<&str> {
1810 Some("Test tool for task execution")
1811 }
1812
1813 fn input_schema(&self) -> Value {
1814 serde_json::json!({
1815 "type": "object",
1816 "properties": {
1817 "message": {"type": "string"}
1818 }
1819 })
1820 }
1821
1822 fn execution(&self) -> Option<crate::protocol::types::ToolExecution> {
1823 Some(crate::protocol::types::ToolExecution {
1824 task_support: Some(crate::protocol::types::TaskSupport::Optional),
1825 })
1826 }
1827
1828 async fn execute(
1829 &self,
1830 ctx: crate::prelude::ExecutionContext<'_>,
1831 ) -> Result<crate::registry::tools::ToolOutput, crate::registry::tools::ToolError> {
1832 let msg = ctx
1833 .params
1834 .get("message")
1835 .and_then(|v| v.as_str())
1836 .unwrap_or("default");
1837
1838 Ok(crate::registry::tools::ToolOutput::text(format!(
1839 "Processed: {}",
1840 msg
1841 )))
1842 }
1843 }
1844
1845 struct SlowTestTool;
1847
1848 #[async_trait::async_trait]
1849 impl crate::registry::tools::Tool for SlowTestTool {
1850 fn name(&self) -> &str {
1851 "slow_test"
1852 }
1853
1854 fn description(&self) -> Option<&str> {
1855 Some("Slow test tool")
1856 }
1857
1858 fn input_schema(&self) -> Value {
1859 serde_json::json!({"type": "object"})
1860 }
1861
1862 fn execution(&self) -> Option<crate::protocol::types::ToolExecution> {
1863 Some(crate::protocol::types::ToolExecution {
1864 task_support: Some(crate::protocol::types::TaskSupport::Optional),
1865 })
1866 }
1867
1868 async fn execute(
1869 &self,
1870 _ctx: crate::prelude::ExecutionContext<'_>,
1871 ) -> Result<crate::registry::tools::ToolOutput, crate::registry::tools::ToolError> {
1872 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1873 Ok(crate::registry::tools::ToolOutput::text(
1874 "Slow operation complete",
1875 ))
1876 }
1877 }
1878
1879 #[tokio::test]
1880 async fn test_task_augmented_tool_call() {
1881 let server = Server::new("test-server", "1.0.0");
1882 server.tool_registry().register(TestTaskTool);
1883
1884 init_test_session(&server, "test-session").await;
1885
1886 let request = JsonRpcRequest {
1888 jsonrpc: "2.0".to_string(),
1889 id: Some(Value::Number(2.into())),
1890 method: "tools/call".to_string(),
1891 params: Some(serde_json::json!({
1892 "name": "test_task",
1893 "arguments": {"message": "hello"},
1894 "task": {"ttl": 60000}
1895 })),
1896 };
1897
1898 let response = server.handle_request("test-session", request).await;
1899
1900 assert!(response.result.is_some());
1902 assert!(response.error.is_none());
1903
1904 let result = response.result.unwrap();
1905 assert!(result.get("task").is_some());
1906
1907 let task = result.get("task").unwrap();
1908 assert!(task.get("taskId").is_some());
1909 assert_eq!(task.get("status").unwrap().as_str().unwrap(), "working");
1910 assert!(task.get("createdAt").is_some());
1911 assert_eq!(task.get("ttl").unwrap().as_u64().unwrap(), 60000);
1912 }
1913
1914 #[tokio::test]
1915 async fn test_task_get_status() {
1916 let server = Server::new("test-server", "1.0.0");
1917 server.tool_registry().register(SlowTestTool);
1918
1919 init_test_session(&server, "test-session").await;
1920
1921 let create_request = JsonRpcRequest {
1923 jsonrpc: "2.0".to_string(),
1924 id: Some(Value::Number(2.into())),
1925 method: "tools/call".to_string(),
1926 params: Some(serde_json::json!({
1927 "name": "slow_test",
1928 "arguments": {},
1929 "task": {"ttl": 60000}
1930 })),
1931 };
1932
1933 let create_response = server.handle_request("test-session", create_request).await;
1934 let task_id = create_response.result.unwrap()["task"]["taskId"]
1935 .as_str()
1936 .unwrap()
1937 .to_string();
1938
1939 let get_request = JsonRpcRequest {
1941 jsonrpc: "2.0".to_string(),
1942 id: Some(Value::Number(3.into())),
1943 method: "tasks/get".to_string(),
1944 params: Some(serde_json::json!({"taskId": task_id})),
1945 };
1946
1947 let get_response = server.handle_request("test-session", get_request).await;
1948
1949 assert!(get_response.result.is_some());
1950 let result = get_response.result.unwrap();
1951 let status = result["status"].as_str().unwrap();
1952 assert!(status == "working" || status == "completed");
1953 }
1954
1955 #[tokio::test]
1956 async fn test_task_result_blocking() {
1957 let server = Server::new("test-server", "1.0.0");
1958 server.tool_registry().register(SlowTestTool);
1959
1960 init_test_session(&server, "test-session").await;
1961
1962 let create_request = JsonRpcRequest {
1964 jsonrpc: "2.0".to_string(),
1965 id: Some(Value::Number(2.into())),
1966 method: "tools/call".to_string(),
1967 params: Some(serde_json::json!({
1968 "name": "slow_test",
1969 "arguments": {},
1970 "task": {"ttl": 60000}
1971 })),
1972 };
1973
1974 let create_response = server.handle_request("test-session", create_request).await;
1975 let task_id = create_response.result.unwrap()["task"]["taskId"]
1976 .as_str()
1977 .unwrap()
1978 .to_string();
1979
1980 let result_request = JsonRpcRequest {
1982 jsonrpc: "2.0".to_string(),
1983 id: Some(Value::Number(3.into())),
1984 method: "tasks/result".to_string(),
1985 params: Some(serde_json::json!({"taskId": task_id})),
1986 };
1987
1988 let result_response = server.handle_request("test-session", result_request).await;
1989
1990 assert!(result_response.result.is_some());
1991 assert!(result_response.error.is_none());
1992
1993 let result = result_response.result.unwrap();
1995 assert!(result.get("content").is_some());
1996 }
1997
1998 #[tokio::test]
1999 async fn test_task_cancel() {
2000 let server = Server::new("test-server", "1.0.0");
2001 server.tool_registry().register(SlowTestTool);
2002
2003 init_test_session(&server, "test-session").await;
2004
2005 let create_request = JsonRpcRequest {
2007 jsonrpc: "2.0".to_string(),
2008 id: Some(Value::Number(2.into())),
2009 method: "tools/call".to_string(),
2010 params: Some(serde_json::json!({
2011 "name": "slow_test",
2012 "arguments": {},
2013 "task": {"ttl": 60000}
2014 })),
2015 };
2016
2017 let create_response = server.handle_request("test-session", create_request).await;
2018 let task_id = create_response.result.unwrap()["task"]["taskId"]
2019 .as_str()
2020 .unwrap()
2021 .to_string();
2022
2023 let cancel_request = JsonRpcRequest {
2025 jsonrpc: "2.0".to_string(),
2026 id: Some(Value::Number(3.into())),
2027 method: "tasks/cancel".to_string(),
2028 params: Some(serde_json::json!({"taskId": task_id})),
2029 };
2030
2031 let cancel_response = server.handle_request("test-session", cancel_request).await;
2032
2033 if cancel_response.result.is_some() {
2035 let result = cancel_response.result.unwrap();
2036 let status = result["status"].as_str().unwrap();
2037 assert_eq!(status, "cancelled");
2038 }
2039 }
2041
2042 #[tokio::test]
2043 async fn test_task_list() {
2044 let server = Server::new("test-server", "1.0.0");
2045 server.tool_registry().register(TestTaskTool);
2046
2047 init_test_session(&server, "test-session").await;
2048
2049 for i in 0..3 {
2051 let request = JsonRpcRequest {
2052 jsonrpc: "2.0".to_string(),
2053 id: Some(Value::Number((i + 2).into())),
2054 method: "tools/call".to_string(),
2055 params: Some(serde_json::json!({
2056 "name": "test_task",
2057 "arguments": {"message": format!("task-{}", i)},
2058 "task": {"ttl": 60000}
2059 })),
2060 };
2061 server.handle_request("test-session", request).await;
2062 }
2063
2064 let list_request = JsonRpcRequest {
2066 jsonrpc: "2.0".to_string(),
2067 id: Some(Value::Number(10.into())),
2068 method: "tasks/list".to_string(),
2069 params: None,
2070 };
2071
2072 let list_response = server.handle_request("test-session", list_request).await;
2073
2074 assert!(list_response.result.is_some());
2075 let result = list_response.result.unwrap();
2076 let tasks = result["tasks"].as_array().unwrap();
2077 assert!(tasks.len() >= 3);
2078 }
2079
2080 #[tokio::test]
2081 async fn test_task_session_isolation() {
2082 let server = Server::new("test-server", "1.0.0");
2083 server.tool_registry().register(TestTaskTool);
2084
2085 init_test_session(&server, "session-1").await;
2086 init_test_session(&server, "session-2").await;
2087
2088 let request = JsonRpcRequest {
2090 jsonrpc: "2.0".to_string(),
2091 id: Some(Value::Number(2.into())),
2092 method: "tools/call".to_string(),
2093 params: Some(serde_json::json!({
2094 "name": "test_task",
2095 "arguments": {"message": "private"},
2096 "task": {"ttl": 60000}
2097 })),
2098 };
2099
2100 let response = server.handle_request("session-1", request).await;
2101 let task_id = response.result.unwrap()["task"]["taskId"]
2102 .as_str()
2103 .unwrap()
2104 .to_string();
2105
2106 let get_request = JsonRpcRequest {
2108 jsonrpc: "2.0".to_string(),
2109 id: Some(Value::Number(3.into())),
2110 method: "tasks/get".to_string(),
2111 params: Some(serde_json::json!({"taskId": task_id})),
2112 };
2113
2114 let get_response = server.handle_request("session-2", get_request).await;
2115
2116 assert!(get_response.error.is_some());
2118 }
2119
2120 #[tokio::test]
2121 async fn test_task_not_found() {
2122 let server = Server::new("test-server", "1.0.0");
2123 init_test_session(&server, "test-session").await;
2124
2125 let request = JsonRpcRequest {
2126 jsonrpc: "2.0".to_string(),
2127 id: Some(Value::Number(2.into())),
2128 method: "tasks/get".to_string(),
2129 params: Some(serde_json::json!({"taskId": "nonexistent-task-id"})),
2130 };
2131
2132 let response = server.handle_request("test-session", request).await;
2133
2134 assert!(response.error.is_some());
2135 assert_eq!(response.error.unwrap().code, -32602);
2136 }
2137
2138 #[tokio::test]
2139 async fn test_task_double_cancel() {
2140 let server = Server::new("test-server", "1.0.0");
2141 server.tool_registry().register(SlowTestTool);
2142
2143 init_test_session(&server, "test-session").await;
2144
2145 let create_request = JsonRpcRequest {
2147 jsonrpc: "2.0".to_string(),
2148 id: Some(Value::Number(2.into())),
2149 method: "tools/call".to_string(),
2150 params: Some(serde_json::json!({
2151 "name": "slow_test",
2152 "arguments": {},
2153 "task": {"ttl": 60000}
2154 })),
2155 };
2156
2157 let create_response = server.handle_request("test-session", create_request).await;
2158 let task_id = create_response.result.unwrap()["task"]["taskId"]
2159 .as_str()
2160 .unwrap()
2161 .to_string();
2162
2163 let cancel_request = JsonRpcRequest {
2165 jsonrpc: "2.0".to_string(),
2166 id: Some(Value::Number(3.into())),
2167 method: "tasks/cancel".to_string(),
2168 params: Some(serde_json::json!({"taskId": task_id.clone()})),
2169 };
2170
2171 let _ = server
2172 .handle_request("test-session", cancel_request.clone())
2173 .await;
2174
2175 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2177
2178 let cancel_request2 = JsonRpcRequest {
2180 jsonrpc: "2.0".to_string(),
2181 id: Some(Value::Number(4.into())),
2182 method: "tasks/cancel".to_string(),
2183 params: Some(serde_json::json!({"taskId": task_id})),
2184 };
2185
2186 let cancel_response2 = server.handle_request("test-session", cancel_request2).await;
2187
2188 assert!(cancel_response2.error.is_some());
2190 }
2191
2192 #[tokio::test]
2197 async fn test_request_elicitation_no_capability() {
2198 let server = Server::new("test-server", "1.0.0");
2199
2200 let request = JsonRpcRequest {
2202 jsonrpc: "2.0".to_string(),
2203 id: Some(Value::Number(1.into())),
2204 method: "initialize".to_string(),
2205 params: Some(serde_json::json!({
2206 "protocolVersion": "2025-11-25",
2207 "capabilities": {
2208 },
2210 "clientInfo": {
2211 "name": "test-client",
2212 "version": "1.0.0"
2213 }
2214 })),
2215 };
2216 server.handle_request("test-session", request).await;
2217
2218 let result = server
2220 .request_elicitation(
2221 "test-session",
2222 "Please provide your email",
2223 serde_json::json!({"type": "object", "properties": {"email": {"type": "string"}}}),
2224 None,
2225 )
2226 .await;
2227
2228 assert!(matches!(
2230 result,
2231 Err(MultiplexerError::UnsupportedCapability(cap)) if cap == "elicitation"
2232 ));
2233 }
2234
2235 #[tokio::test]
2236 async fn test_request_elicitation_session_not_found() {
2237 let server = Server::new("test-server", "1.0.0");
2238
2239 let result = server
2241 .request_elicitation(
2242 "nonexistent-session",
2243 "Please provide your email",
2244 serde_json::json!({"type": "object", "properties": {"email": {"type": "string"}}}),
2245 None,
2246 )
2247 .await;
2248
2249 assert!(matches!(result, Err(MultiplexerError::Transport(_))));
2251 }
2252
2253 #[tokio::test]
2254 async fn test_client_requester_with_elicitation_capability() {
2255 let server = Server::new("test-server", "1.0.0");
2256
2257 let request = JsonRpcRequest {
2259 jsonrpc: "2.0".to_string(),
2260 id: Some(Value::Number(1.into())),
2261 method: "initialize".to_string(),
2262 params: Some(serde_json::json!({
2263 "protocolVersion": "2025-11-25",
2264 "capabilities": {
2265 "elicitation": {}
2266 },
2267 "clientInfo": {
2268 "name": "test-client",
2269 "version": "1.0.0"
2270 }
2271 })),
2272 };
2273 server.handle_request("test-session", request).await;
2274
2275 let requester = server.create_client_requester("test-session");
2277 assert!(requester.is_some());
2278 let requester = requester.unwrap();
2279 assert!(requester.supports_elicitation());
2280 }
2281
2282 async fn init_session_with_subscriptions(server: &Server, session_id: &str) {
2288 server.set_capabilities(ServerCapabilities {
2290 resources: Some(crate::protocol::capabilities::ResourcesCapability {
2291 subscribe: Some(true),
2292 list_changed: Some(true),
2293 list_templates: Some(true),
2294 }),
2295 ..Default::default()
2296 });
2297
2298 let request = JsonRpcRequest {
2299 jsonrpc: "2.0".to_string(),
2300 id: Some(Value::Number(1.into())),
2301 method: "initialize".to_string(),
2302 params: Some(serde_json::json!({
2303 "protocolVersion": "2025-11-25",
2304 "capabilities": {},
2305 "clientInfo": {
2306 "name": "test-client",
2307 "version": "1.0.0"
2308 }
2309 })),
2310 };
2311 server.handle_request(session_id, request).await;
2312 }
2313
2314 #[tokio::test]
2315 async fn test_resources_subscribe() {
2316 let server = Server::new("test-server", "1.0.0");
2317 init_session_with_subscriptions(&server, "test-session").await;
2318
2319 let request = JsonRpcRequest {
2320 jsonrpc: "2.0".to_string(),
2321 id: Some(Value::Number(2.into())),
2322 method: "resources/subscribe".to_string(),
2323 params: Some(serde_json::json!({"uri": "file:///test.txt"})),
2324 };
2325
2326 let response = server.handle_request("test-session", request).await;
2327
2328 assert!(response.result.is_some());
2329 assert!(response.error.is_none());
2330
2331 assert!(server
2333 .subscription_manager()
2334 .is_subscribed("test-session", "file:///test.txt"));
2335 }
2336
2337 #[tokio::test]
2338 async fn test_resources_unsubscribe() {
2339 let server = Server::new("test-server", "1.0.0");
2340 init_session_with_subscriptions(&server, "test-session").await;
2341
2342 let subscribe_request = JsonRpcRequest {
2344 jsonrpc: "2.0".to_string(),
2345 id: Some(Value::Number(2.into())),
2346 method: "resources/subscribe".to_string(),
2347 params: Some(serde_json::json!({"uri": "file:///test.txt"})),
2348 };
2349 server
2350 .handle_request("test-session", subscribe_request)
2351 .await;
2352
2353 let unsubscribe_request = JsonRpcRequest {
2355 jsonrpc: "2.0".to_string(),
2356 id: Some(Value::Number(3.into())),
2357 method: "resources/unsubscribe".to_string(),
2358 params: Some(serde_json::json!({"uri": "file:///test.txt"})),
2359 };
2360 let response = server
2361 .handle_request("test-session", unsubscribe_request)
2362 .await;
2363
2364 assert!(response.result.is_some());
2365 assert!(response.error.is_none());
2366
2367 assert!(!server
2369 .subscription_manager()
2370 .is_subscribed("test-session", "file:///test.txt"));
2371 }
2372
2373 #[tokio::test]
2374 async fn test_resources_subscribe_without_capability() {
2375 let server = Server::new("test-server", "1.0.0");
2376
2377 let request = JsonRpcRequest {
2379 jsonrpc: "2.0".to_string(),
2380 id: Some(Value::Number(1.into())),
2381 method: "initialize".to_string(),
2382 params: Some(serde_json::json!({
2383 "protocolVersion": "2025-11-25",
2384 "capabilities": {},
2385 "clientInfo": {
2386 "name": "test-client",
2387 "version": "1.0.0"
2388 }
2389 })),
2390 };
2391 server.handle_request("test-session", request).await;
2392
2393 let subscribe_request = JsonRpcRequest {
2395 jsonrpc: "2.0".to_string(),
2396 id: Some(Value::Number(2.into())),
2397 method: "resources/subscribe".to_string(),
2398 params: Some(serde_json::json!({"uri": "file:///test.txt"})),
2399 };
2400 let response = server
2401 .handle_request("test-session", subscribe_request)
2402 .await;
2403
2404 assert!(response.error.is_some());
2405 }
2406
2407 #[tokio::test]
2408 async fn test_session_removal_cleans_subscriptions() {
2409 let server = Server::new("test-server", "1.0.0");
2410 init_session_with_subscriptions(&server, "test-session").await;
2411
2412 for uri in &["file:///a.txt", "file:///b.txt", "file:///c.txt"] {
2414 let request = JsonRpcRequest {
2415 jsonrpc: "2.0".to_string(),
2416 id: Some(Value::Number(2.into())),
2417 method: "resources/subscribe".to_string(),
2418 params: Some(serde_json::json!({"uri": uri})),
2419 };
2420 server.handle_request("test-session", request).await;
2421 }
2422
2423 assert_eq!(
2425 server
2426 .subscription_manager()
2427 .get_session_subscriptions("test-session")
2428 .len(),
2429 3
2430 );
2431
2432 server.remove_session("test-session");
2434
2435 assert_eq!(
2437 server
2438 .subscription_manager()
2439 .get_session_subscriptions("test-session")
2440 .len(),
2441 0
2442 );
2443 }
2444
2445 #[tokio::test]
2446 async fn test_notify_resource_updated() {
2447 let server = Server::new("test-server", "1.0.0");
2448 init_session_with_subscriptions(&server, "test-session").await;
2449
2450 let request = JsonRpcRequest {
2452 jsonrpc: "2.0".to_string(),
2453 id: Some(Value::Number(2.into())),
2454 method: "resources/subscribe".to_string(),
2455 params: Some(serde_json::json!({"uri": "file:///test.txt"})),
2456 };
2457 server.handle_request("test-session", request).await;
2458
2459 server.notify_resource_updated("file:///test.txt");
2462
2463 }
2466
2467 #[tokio::test]
2468 async fn test_subscription_requires_initialization() {
2469 let server = Server::new("test-server", "1.0.0");
2470
2471 let request = JsonRpcRequest {
2473 jsonrpc: "2.0".to_string(),
2474 id: Some(Value::Number(1.into())),
2475 method: "resources/subscribe".to_string(),
2476 params: Some(serde_json::json!({"uri": "file:///test.txt"})),
2477 };
2478
2479 let response = server.handle_request("test-session", request).await;
2480
2481 assert!(response.error.is_some());
2483 }
2484
2485 async fn init_session_with_completion(server: &Server, session_id: &str) {
2491 server.set_capabilities(ServerCapabilities {
2493 completion: Some(crate::protocol::capabilities::CompletionCapability {}),
2494 ..Default::default()
2495 });
2496
2497 let request = JsonRpcRequest {
2498 jsonrpc: "2.0".to_string(),
2499 id: Some(Value::Number(1.into())),
2500 method: "initialize".to_string(),
2501 params: Some(serde_json::json!({
2502 "protocolVersion": "2025-11-25",
2503 "capabilities": {},
2504 "clientInfo": {
2505 "name": "test-client",
2506 "version": "1.0.0"
2507 }
2508 })),
2509 };
2510 server.handle_request(session_id, request).await;
2511 }
2512
2513 #[tokio::test]
2514 async fn test_completion_complete_prompt() {
2515 use crate::managers::completion::CompletionProvider;
2516 use crate::protocol::types::CompletionValue;
2517 use std::sync::Arc;
2518
2519 let server = Server::new("test-server", "1.0.0");
2520
2521 struct TestProvider;
2523
2524 #[async_trait::async_trait]
2525 impl CompletionProvider for TestProvider {
2526 async fn complete_prompt(
2527 &self,
2528 prompt_name: &str,
2529 argument_name: &str,
2530 argument_value: &str,
2531 ) -> Vec<CompletionValue> {
2532 if prompt_name == "greet" && argument_name == "name" {
2533 vec!["Alice", "Amy", "Bob"]
2534 .into_iter()
2535 .filter(|v| v.to_lowercase().starts_with(&argument_value.to_lowercase()))
2536 .map(CompletionValue::new)
2537 .collect()
2538 } else {
2539 vec![]
2540 }
2541 }
2542 }
2543
2544 server
2546 .completion_manager()
2547 .register_prompt_provider("greet", Arc::new(TestProvider));
2548
2549 init_session_with_completion(&server, "test-session").await;
2550
2551 let request = JsonRpcRequest {
2553 jsonrpc: "2.0".to_string(),
2554 id: Some(Value::Number(2.into())),
2555 method: "completion/complete".to_string(),
2556 params: Some(serde_json::json!({
2557 "ref": {
2558 "type": "ref/prompt",
2559 "name": "greet"
2560 },
2561 "argument": {
2562 "name": "name",
2563 "value": "A"
2564 }
2565 })),
2566 };
2567
2568 let response = server.handle_request("test-session", request).await;
2569
2570 assert!(response.result.is_some());
2571 assert!(response.error.is_none());
2572
2573 let result = response.result.unwrap();
2574 let values = result["completion"]["values"].as_array().unwrap();
2575 assert_eq!(values.len(), 2); }
2577
2578 #[tokio::test]
2579 async fn test_completion_complete_no_capability() {
2580 let server = Server::new("test-server", "1.0.0");
2581
2582 let init_request = JsonRpcRequest {
2584 jsonrpc: "2.0".to_string(),
2585 id: Some(Value::Number(1.into())),
2586 method: "initialize".to_string(),
2587 params: Some(serde_json::json!({
2588 "protocolVersion": "2025-11-25",
2589 "capabilities": {},
2590 "clientInfo": {
2591 "name": "test-client",
2592 "version": "1.0.0"
2593 }
2594 })),
2595 };
2596 server.handle_request("test-session", init_request).await;
2597
2598 let request = JsonRpcRequest {
2600 jsonrpc: "2.0".to_string(),
2601 id: Some(Value::Number(2.into())),
2602 method: "completion/complete".to_string(),
2603 params: Some(serde_json::json!({
2604 "ref": {
2605 "type": "ref/prompt",
2606 "name": "greet"
2607 },
2608 "argument": {
2609 "name": "name",
2610 "value": "A"
2611 }
2612 })),
2613 };
2614
2615 let response = server.handle_request("test-session", request).await;
2616
2617 assert!(response.error.is_some());
2619 }
2620
2621 #[tokio::test]
2622 async fn test_completion_complete_not_initialized() {
2623 let server = Server::new("test-server", "1.0.0");
2624
2625 let request = JsonRpcRequest {
2627 jsonrpc: "2.0".to_string(),
2628 id: Some(Value::Number(1.into())),
2629 method: "completion/complete".to_string(),
2630 params: Some(serde_json::json!({
2631 "ref": {
2632 "type": "ref/prompt",
2633 "name": "greet"
2634 },
2635 "argument": {
2636 "name": "name",
2637 "value": "A"
2638 }
2639 })),
2640 };
2641
2642 let response = server.handle_request("test-session", request).await;
2643
2644 assert!(response.error.is_some());
2646 }
2647
2648 #[tokio::test]
2649 async fn test_completion_complete_invalid_params() {
2650 let server = Server::new("test-server", "1.0.0");
2651 init_session_with_completion(&server, "test-session").await;
2652
2653 let request = JsonRpcRequest {
2655 jsonrpc: "2.0".to_string(),
2656 id: Some(Value::Number(2.into())),
2657 method: "completion/complete".to_string(),
2658 params: Some(serde_json::json!({
2659 "invalid": "params"
2660 })),
2661 };
2662
2663 let response = server.handle_request("test-session", request).await;
2664
2665 assert!(response.error.is_some());
2667 let error = response.error.unwrap();
2668 assert_eq!(error.code, -32602); }
2670
2671 #[tokio::test]
2672 async fn test_completion_complete_resource() {
2673 use crate::managers::completion::CompletionProvider;
2674 use crate::protocol::types::CompletionValue;
2675 use std::sync::Arc;
2676
2677 let server = Server::new("test-server", "1.0.0");
2678
2679 struct FileProvider;
2681
2682 #[async_trait::async_trait]
2683 impl CompletionProvider for FileProvider {
2684 async fn complete_resource(
2685 &self,
2686 _uri: &str,
2687 _argument_name: &str,
2688 argument_value: &str,
2689 ) -> Vec<CompletionValue> {
2690 vec!["src/main.rs", "src/lib.rs", "tests/test.rs"]
2691 .into_iter()
2692 .filter(|v| v.starts_with(argument_value))
2693 .map(CompletionValue::new)
2694 .collect()
2695 }
2696 }
2697
2698 server
2700 .completion_manager()
2701 .register_resource_provider("file:///", Arc::new(FileProvider));
2702
2703 init_session_with_completion(&server, "test-session").await;
2704
2705 let request = JsonRpcRequest {
2707 jsonrpc: "2.0".to_string(),
2708 id: Some(Value::Number(2.into())),
2709 method: "completion/complete".to_string(),
2710 params: Some(serde_json::json!({
2711 "ref": {
2712 "type": "ref/resource",
2713 "uri": "file:///project"
2714 },
2715 "argument": {
2716 "name": "path",
2717 "value": "src/"
2718 }
2719 })),
2720 };
2721
2722 let response = server.handle_request("test-session", request).await;
2723
2724 assert!(response.result.is_some());
2725 assert!(response.error.is_none());
2726
2727 let result = response.result.unwrap();
2728 let values = result["completion"]["values"].as_array().unwrap();
2729 assert_eq!(values.len(), 2); }
2731}