mcp_host/server/
core.rs

1//! Server core implementation
2//!
3//! Main MCP server with method handlers and session management.
4//!
5//! # Bidirectional Communication
6//!
7//! The server supports bidirectional communication:
8//! - Client→Server: tools/call, resources/read, prompts/get, etc.
9//! - Server→Client: roots/list, sampling/createMessage, etc.
10//!
11//! Server-initiated requests are handled via the [`RequestMultiplexer`].
12
13use std::sync::Arc;
14use std::time::Duration;
15
16use arc_swap::ArcSwap;
17use dashmap::DashMap;
18use serde_json::Value;
19use tokio::sync::mpsc;
20
21use crate::content::types::Content;
22use crate::managers::cancellation::{CancellationManager, CancelledNotificationParams};
23use crate::managers::completion::CompletionManager;
24use crate::managers::subscription::{SubscribeRequest, SubscriptionManager, UnsubscribeRequest};
25use crate::protocol::capabilities::{InitializeRequest, InitializeResult, ServerCapabilities};
26use crate::protocol::errors::{ErrorType, McpError, codes};
27use crate::protocol::methods::McpMethod;
28use crate::protocol::types::{Implementation, JsonRpcRequest, JsonRpcResponse};
29use crate::protocol::version;
30use crate::registry::prompts::PromptManager;
31use crate::registry::resources::ResourceManager;
32use crate::registry::tools::{ToolError, ToolRegistry};
33use crate::server::handler::{
34    RequestContext, error_response, require_initialization, success_response,
35};
36use crate::server::middleware::MiddlewareChain;
37use crate::server::multiplexer::{
38    ClientRequester, CreateMessageParams, CreateMessageResult, JsonRpcClientRequest,
39    ListRootsResult, MultiplexerError, RequestMultiplexer, Root,
40};
41use crate::server::session::Session;
42use crate::server::visibility::{Environment, VisibilityContext};
43use crate::transport::traits::{IncomingMessage, JsonRpcNotification, Transport};
44
45fn tool_error_to_mcp_error(tool_name: &str, err: ToolError) -> McpError {
46    match err {
47        ToolError::NotFound(_) => McpError::not_found(
48            codes::TOOL_NOT_FOUND,
49            format!("Tool not found: {}", tool_name),
50        ),
51        ToolError::InvalidArguments(message) => {
52            McpError::validation(codes::INVALID_TOOL_ARGS, message)
53        }
54        ToolError::Execution(message) | ToolError::Internal(message) => {
55            McpError::internal(codes::TOOL_EXECUTION_FAILED, message)
56        }
57        ToolError::CircuitOpen { tool, message } => McpError::builder(
58            ErrorType::CircuitOpen,
59            codes::CIRCUIT_BREAKER_OPEN,
60        )
61        .message(message)
62        .detail("tool", tool)
63        .build(),
64    }
65}
66
67/// Main MCP server
68pub struct Server {
69    /// Server name
70    name: String,
71
72    /// Server version
73    version: String,
74
75    /// Server instructions for LLMs (atomically swappable)
76    instructions: Arc<ArcSwap<Option<String>>>,
77
78    /// Server capabilities (atomically swappable for dynamic updates)
79    capabilities: Arc<ArcSwap<ServerCapabilities>>,
80
81    /// Active sessions by session ID
82    sessions: DashMap<String, Session>,
83
84    /// Middleware chain for request processing
85    middleware: MiddlewareChain,
86
87    /// Tool registry
88    tool_registry: ToolRegistry,
89
90    /// Resource manager
91    resource_manager: ResourceManager,
92
93    /// Prompt manager
94    prompt_manager: PromptManager,
95
96    /// Notification sender (for background tasks to send notifications)
97    notification_tx: mpsc::UnboundedSender<JsonRpcNotification>,
98
99    /// Notification receiver (internal, used by run loop)
100    notification_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<JsonRpcNotification>>>,
101
102    /// Global logger for server notifications
103    logger: crate::logging::McpLogger,
104
105    /// Request multiplexer for server→client requests
106    multiplexer: Arc<RequestMultiplexer>,
107
108    /// Channel to send server→client requests to the transport
109    request_tx: mpsc::UnboundedSender<JsonRpcClientRequest>,
110
111    /// Receiver for server→client requests (used by run loop)
112    request_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<JsonRpcClientRequest>>>,
113
114    /// Task store for async task execution
115    task_store: Arc<crate::managers::task::TaskStore>,
116
117    /// Cancellation manager for tracking cancellable operations
118    cancellation_manager: CancellationManager,
119
120    /// Subscription manager for resource subscriptions
121    subscription_manager: SubscriptionManager,
122
123    /// Completion manager for argument value suggestions
124    completion_manager: CompletionManager,
125
126    /// Optional environment for visibility checks
127    environment: Option<Arc<dyn Environment>>,
128}
129
130impl Server {
131    /// Create new server
132    pub fn new(name: impl Into<String>, version: impl Into<String>) -> Self {
133        let (notification_tx, notification_rx) = mpsc::unbounded_channel();
134        let (request_tx, request_rx) = mpsc::unbounded_channel();
135
136        // Create task store with 5-minute TTL and 5-second poll interval
137        let task_store = Arc::new(crate::managers::task::TaskStore::new(
138            std::time::Duration::from_secs(300),
139            std::time::Duration::from_secs(5),
140        ));
141
142        // Spawn cleanup task if in tokio runtime context (runs every minute)
143        if tokio::runtime::Handle::try_current().is_ok() {
144            task_store
145                .clone()
146                .spawn_cleanup_task(std::time::Duration::from_secs(60));
147        }
148
149        // Default capabilities with tasks support
150        let default_caps = ServerCapabilities {
151            tasks: Some(crate::protocol::capabilities::TasksCapability {
152                list: Some(crate::protocol::capabilities::EmptyObject {}),
153                cancel: Some(crate::protocol::capabilities::EmptyObject {}),
154                requests: Some(crate::protocol::capabilities::TasksRequestsCapability {
155                    tools: Some(crate::protocol::capabilities::TasksToolsCapability {
156                        call: Some(crate::protocol::capabilities::EmptyObject {}),
157                    }),
158                    ..Default::default()
159                }),
160            }),
161            ..Default::default()
162        };
163
164        // Create logger with default config
165        let logger = crate::logging::McpLogger::new(notification_tx.clone(), "mcp-server");
166
167        Self {
168            name: name.into(),
169            version: version.into(),
170            instructions: Arc::new(ArcSwap::new(Arc::new(None))),
171            capabilities: Arc::new(ArcSwap::new(Arc::new(default_caps))),
172            sessions: DashMap::new(),
173            middleware: MiddlewareChain::new(),
174            tool_registry: ToolRegistry::new(),
175            resource_manager: ResourceManager::new(),
176            prompt_manager: PromptManager::new(),
177            notification_tx: notification_tx.clone(),
178            notification_rx: Arc::new(tokio::sync::Mutex::new(notification_rx)),
179            logger,
180            multiplexer: Arc::new(RequestMultiplexer::new()),
181            request_tx,
182            request_rx: Arc::new(tokio::sync::Mutex::new(request_rx)),
183            task_store,
184            cancellation_manager: CancellationManager::new(),
185            subscription_manager: SubscriptionManager::with_notifications(notification_tx),
186            completion_manager: CompletionManager::new(),
187            environment: None,
188        }
189    }
190
191    /// Get server name
192    pub fn name(&self) -> &str {
193        &self.name
194    }
195
196    /// Get server version
197    pub fn version(&self) -> &str {
198        &self.version
199    }
200
201    /// Get current capabilities
202    pub fn capabilities(&self) -> Arc<ServerCapabilities> {
203        self.capabilities.load_full()
204    }
205
206    /// Update server capabilities
207    pub fn set_capabilities(&self, capabilities: ServerCapabilities) {
208        self.capabilities.store(Arc::new(capabilities));
209    }
210
211    /// Get current instructions
212    pub fn instructions(&self) -> Option<String> {
213        (**self.instructions.load()).clone()
214    }
215
216    /// Set server instructions for LLMs
217    pub fn set_instructions(&self, instructions: Option<String>) {
218        self.instructions.store(Arc::new(instructions));
219    }
220
221    /// Set environment for visibility checks
222    pub fn set_environment(&mut self, environment: Arc<dyn Environment>) {
223        self.environment = Some(environment);
224    }
225
226    fn visibility_context<'a>(&'a self, session: &'a Session) -> VisibilityContext<'a> {
227        match self.environment.as_deref() {
228            Some(env) => VisibilityContext::with_environment(session, env),
229            None => VisibilityContext::new(session),
230        }
231    }
232
233    /// Add middleware to the chain
234    pub fn add_middleware(&mut self, middleware: crate::server::middleware::MiddlewareFn) {
235        self.middleware.add(middleware);
236    }
237
238    /// Get tool registry
239    pub fn tool_registry(&self) -> &ToolRegistry {
240        &self.tool_registry
241    }
242
243    /// Get resource manager
244    pub fn resource_manager(&self) -> &ResourceManager {
245        &self.resource_manager
246    }
247
248    /// Get prompt manager
249    pub fn prompt_manager(&self) -> &PromptManager {
250        &self.prompt_manager
251    }
252
253    /// Get subscription manager for resource subscriptions
254    pub fn subscription_manager(&self) -> &SubscriptionManager {
255        &self.subscription_manager
256    }
257
258    /// Get completion manager for argument value suggestions
259    pub fn completion_manager(&self) -> &CompletionManager {
260        &self.completion_manager
261    }
262
263    /// Get global logger
264    pub fn logger(&self) -> &crate::logging::McpLogger {
265        &self.logger
266    }
267
268    /// Get notification sender (for background tasks)
269    pub fn notification_sender(&self) -> mpsc::UnboundedSender<JsonRpcNotification> {
270        self.notification_tx.clone()
271    }
272
273    /// Send a notification to the client
274    pub fn send_notification(
275        &self,
276        method: impl Into<String>,
277        params: Option<Value>,
278    ) -> Result<(), Box<dyn std::error::Error>> {
279        let notification = JsonRpcNotification::new(method, params);
280        self.notification_tx.send(notification)?;
281        Ok(())
282    }
283
284    /// Get session by ID
285    pub fn get_session(&self, session_id: &str) -> Option<Session> {
286        self.sessions.get(session_id).map(|s| s.clone())
287    }
288
289    /// Remove session
290    ///
291    /// Also cleans up any resource subscriptions for the session.
292    pub fn remove_session(&self, session_id: &str) -> Option<Session> {
293        // Clean up subscriptions before removing session
294        self.subscription_manager.remove_session(session_id);
295        self.sessions.remove(session_id).map(|(_, s)| s)
296    }
297
298    /// Notify subscribers that a resource has been updated
299    ///
300    /// Sends `notifications/resources/updated` to all sessions subscribed to the URI.
301    /// Call this when a resource's content changes.
302    ///
303    /// # Example
304    ///
305    /// ```rust,ignore
306    /// // After updating a file resource
307    /// server.notify_resource_updated("file:///path/to/file.txt");
308    /// ```
309    pub fn notify_resource_updated(&self, uri: &str) {
310        self.subscription_manager.notify_resource_updated(uri);
311    }
312
313    /// Notify subscribers for multiple resources (batch operation)
314    ///
315    /// Convenience method for notifying about multiple resource updates at once.
316    pub fn notify_resources_updated(&self, uris: &[&str]) {
317        self.subscription_manager.notify_resources_updated(uris);
318    }
319
320    /// Get the request multiplexer (for advanced use cases)
321    pub fn multiplexer(&self) -> Arc<RequestMultiplexer> {
322        self.multiplexer.clone()
323    }
324
325    /// Get the cancellation manager
326    ///
327    /// Use this to register cancellable operations and check cancellation status.
328    pub fn cancellation_manager(&self) -> &CancellationManager {
329        &self.cancellation_manager
330    }
331
332    /// Create a client requester for the given session
333    ///
334    /// The client requester allows tools to make server→client requests
335    /// like roots/list, sampling/createMessage, and elicitation/create.
336    pub fn create_client_requester(&self, session_id: &str) -> Option<ClientRequester> {
337        let session = self.get_session(session_id)?;
338        let caps = session.capabilities.as_ref()?;
339
340        Some(ClientRequester::new(
341            self.request_tx.clone(),
342            self.multiplexer.clone(),
343            caps.roots.is_some(),
344            caps.sampling.is_some(),
345            caps.elicitation.is_some(),
346        ))
347    }
348
349    /// Request workspace roots from the client
350    ///
351    /// Sends a `roots/list` request to the client and waits for the response.
352    /// The client must have the `roots` capability advertised.
353    ///
354    /// # Arguments
355    ///
356    /// * `session_id` - The session to check for roots capability
357    /// * `timeout` - Optional timeout (defaults to 30 seconds)
358    ///
359    /// # Returns
360    ///
361    /// List of workspace roots, or an error if:
362    /// - Client doesn't support roots capability
363    /// - Request times out
364    /// - Client returns an error
365    ///
366    /// # Example
367    ///
368    /// ```rust,ignore
369    /// let roots = server.request_roots("session-123", None).await?;
370    /// for root in roots {
371    ///     println!("Root: {} ({})", root.name.unwrap_or_default(), root.uri);
372    /// }
373    /// ```
374    pub async fn request_roots(
375        &self,
376        session_id: &str,
377        timeout: Option<Duration>,
378    ) -> Result<Vec<Root>, MultiplexerError> {
379        // Check if client supports roots
380        if let Some(session) = self.get_session(session_id) {
381            if let Some(caps) = &session.capabilities {
382                if caps.roots.is_none() {
383                    return Err(MultiplexerError::UnsupportedCapability("roots".to_string()));
384                }
385            } else {
386                return Err(MultiplexerError::UnsupportedCapability("roots".to_string()));
387            }
388        } else {
389            return Err(MultiplexerError::Transport("session not found".to_string()));
390        }
391
392        // Create pending request
393        let (id, rx) = self.multiplexer.create_pending("roots/list");
394
395        // Build and send the request
396        let request = JsonRpcClientRequest::new(&id, "roots/list", Some(serde_json::json!({})));
397
398        self.request_tx
399            .send(request)
400            .map_err(|e| MultiplexerError::Transport(e.to_string()))?;
401
402        // Wait for response with timeout
403        let timeout = timeout.unwrap_or(self.multiplexer.default_timeout());
404        let result = tokio::time::timeout(timeout, rx)
405            .await
406            .map_err(|_| MultiplexerError::Timeout(timeout))?
407            .map_err(|_| MultiplexerError::ChannelClosed)??;
408
409        // Parse the result
410        let list_result: ListRootsResult = serde_json::from_value(result)?;
411        Ok(list_result.roots)
412    }
413
414    /// Request an LLM completion from the client
415    ///
416    /// Sends a `sampling/createMessage` request to the client.
417    /// The client must have the `sampling` capability advertised.
418    ///
419    /// # Arguments
420    ///
421    /// * `session_id` - The session to check for sampling capability
422    /// * `params` - The sampling parameters
423    /// * `timeout` - Optional timeout (defaults to 30 seconds)
424    ///
425    /// # Returns
426    ///
427    /// The completion result, or an error if:
428    /// - Client doesn't support sampling capability
429    /// - Request times out
430    /// - Client returns an error
431    ///
432    /// # Example
433    ///
434    /// ```rust,ignore
435    /// use mcp_host::server::multiplexer::{CreateMessageParams, SamplingContent, SamplingMessage};
436    ///
437    /// let params = CreateMessageParams {
438    ///     messages: vec![SamplingMessage {
439    ///         role: "user".to_string(),
440    ///         content: SamplingContent::Text { text: "Hello!".to_string() },
441    ///     }],
442    ///     max_tokens: 1000,
443    ///     ..Default::default()
444    /// };
445    ///
446    /// let result = server.request_sampling("session-123", params, None).await?;
447    /// println!("Response: {:?}", result.content);
448    /// ```
449    pub async fn request_sampling(
450        &self,
451        session_id: &str,
452        params: CreateMessageParams,
453        timeout: Option<Duration>,
454    ) -> Result<CreateMessageResult, MultiplexerError> {
455        // Check if client supports sampling
456        if let Some(session) = self.get_session(session_id) {
457            if let Some(caps) = &session.capabilities {
458                if caps.sampling.is_none() {
459                    return Err(MultiplexerError::UnsupportedCapability(
460                        "sampling".to_string(),
461                    ));
462                }
463            } else {
464                return Err(MultiplexerError::UnsupportedCapability(
465                    "sampling".to_string(),
466                ));
467            }
468        } else {
469            return Err(MultiplexerError::Transport("session not found".to_string()));
470        }
471
472        // Create pending request
473        let (id, rx) = self.multiplexer.create_pending("sampling/createMessage");
474
475        // Build and send the request
476        let params_value = serde_json::to_value(&params)?;
477        let request = JsonRpcClientRequest::new(&id, "sampling/createMessage", Some(params_value));
478
479        self.request_tx
480            .send(request)
481            .map_err(|e| MultiplexerError::Transport(e.to_string()))?;
482
483        // Wait for response with timeout
484        let timeout = timeout.unwrap_or(self.multiplexer.default_timeout());
485        let result = tokio::time::timeout(timeout, rx)
486            .await
487            .map_err(|_| MultiplexerError::Timeout(timeout))?
488            .map_err(|_| MultiplexerError::ChannelClosed)??;
489
490        // Parse the result
491        let create_result: CreateMessageResult = serde_json::from_value(result)?;
492        Ok(create_result)
493    }
494
495    /// Request structured user input from the client
496    ///
497    /// Sends an `elicitation/create` request to the client.
498    /// The client must have the `elicitation` capability advertised.
499    ///
500    /// # Arguments
501    ///
502    /// * `session_id` - The session to check for elicitation capability
503    /// * `message` - The message to show the user
504    /// * `requested_schema` - JSON Schema for the structured input
505    /// * `timeout` - Optional timeout (defaults to 30 seconds)
506    ///
507    /// # Returns
508    ///
509    /// The elicitation result (action + optional content), or an error if:
510    /// - Client doesn't support elicitation capability
511    /// - Request times out
512    /// - Client returns an error
513    ///
514    /// # Example
515    ///
516    /// ```rust,ignore
517    /// use mcp_host::protocol::elicitation::ElicitationSchema;
518    /// use mcp_host::protocol::types::ElicitationAction;
519    ///
520    /// let schema = ElicitationSchema::builder()
521    ///     .required_email("email")
522    ///     .required_integer("age", 0, 150)
523    ///     .optional_bool("newsletter", false)
524    ///     .build_unchecked();
525    ///
526    /// let result = server.request_elicitation(
527    ///     "session-123",
528    ///     "Please provide your information",
529    ///     serde_json::to_value(&schema).unwrap(),
530    ///     None,
531    /// ).await?;
532    ///
533    /// match result.action {
534    ///     ElicitationAction::Accept => {
535    ///         if let Some(content) = result.content {
536    ///             let email = content["email"].as_str().unwrap();
537    ///             let age = content["age"].as_i64().unwrap();
538    ///             println!("User: {} (age {})", email, age);
539    ///         }
540    ///     }
541    ///     ElicitationAction::Decline => println!("User declined"),
542    ///     ElicitationAction::Cancel => println!("User cancelled"),
543    /// }
544    /// ```
545    pub async fn request_elicitation(
546        &self,
547        session_id: &str,
548        message: impl Into<String>,
549        requested_schema: Value,
550        timeout: Option<Duration>,
551    ) -> Result<crate::protocol::types::CreateElicitationResult, MultiplexerError> {
552        // Check if client supports elicitation
553        if let Some(session) = self.get_session(session_id) {
554            if let Some(caps) = &session.capabilities {
555                if caps.elicitation.is_none() {
556                    return Err(MultiplexerError::UnsupportedCapability(
557                        "elicitation".to_string(),
558                    ));
559                }
560            } else {
561                return Err(MultiplexerError::UnsupportedCapability(
562                    "elicitation".to_string(),
563                ));
564            }
565        } else {
566            return Err(MultiplexerError::Transport("session not found".to_string()));
567        }
568
569        // Create pending request
570        let (id, rx) = self.multiplexer.create_pending("elicitation/create");
571
572        // Build and send the request
573        let params = serde_json::json!({
574            "message": message.into(),
575            "requestedSchema": requested_schema,
576        });
577        let request = JsonRpcClientRequest::new(&id, "elicitation/create", Some(params));
578
579        self.request_tx
580            .send(request)
581            .map_err(|e| MultiplexerError::Transport(e.to_string()))?;
582
583        // Wait for response with timeout
584        let timeout = timeout.unwrap_or(self.multiplexer.default_timeout());
585        let result = tokio::time::timeout(timeout, rx)
586            .await
587            .map_err(|_| MultiplexerError::Timeout(timeout))?
588            .map_err(|_| MultiplexerError::ChannelClosed)??;
589
590        // Parse the result
591        let elicitation_result: crate::protocol::types::CreateElicitationResult =
592            serde_json::from_value(result)?;
593        Ok(elicitation_result)
594    }
595
596    /// Run server with the given transport
597    ///
598    /// This is the main event loop that reads requests from the transport,
599    /// processes them, and writes responses back.
600    ///
601    /// Supports bidirectional communication:
602    /// - Incoming client requests are handled and responses sent
603    /// - Server-initiated requests are sent via channels and responses routed back
604    pub async fn run<T: Transport>(
605        &self,
606        mut transport: T,
607    ) -> Result<(), Box<dyn std::error::Error>> {
608        // Generate a unique session ID per MCP spec (globally unique, cryptographically secure)
609        let session_id = uuid::Uuid::new_v4().to_string();
610
611        // Get receivers
612        let mut notification_rx = self.notification_rx.lock().await;
613        let mut request_rx = self.request_rx.lock().await;
614
615        loop {
616            tokio::select! {
617                // Handle outgoing notifications
618                Some(notification) = notification_rx.recv() => {
619                    tracing::debug!(method = %notification.method, "Sending notification");
620                    if let Err(e) = transport.send_notification(notification).await {
621                        tracing::error!(error = %e, "Failed to send notification");
622                    }
623                }
624
625                // Handle outgoing server→client requests
626                Some(request) = request_rx.recv() => {
627                    tracing::debug!(method = %request.method, id = %request.id, "Sending request to client");
628                    if let Err(e) = transport.send_request(request).await {
629                        tracing::error!(error = %e, "Failed to send request to client");
630                    }
631                }
632
633                // Handle incoming messages (requests or responses)
634                result = transport.read_incoming() => {
635                    match result {
636                        Ok(IncomingMessage::Request(request)) => {
637                            // Check if this is a notification (no ID) or a request (has ID)
638                            let is_notification = request.id.is_none();
639
640                            // Handle client request
641                            let response = self.handle_request(&session_id, request).await;
642
643                            // Only write response if this was NOT a notification
644                            // Notifications must NOT receive responses per JSON-RPC spec
645                            if !is_notification
646                                && let Err(e) = transport.write_message(response).await {
647                                    tracing::error!(error = %e, "Failed to write message");
648                                    break;
649                                }
650                        }
651                        Ok(IncomingMessage::Response(response)) => {
652                            // Route response to pending server-initiated request
653                            if !self.multiplexer.route_response(&response) {
654                                tracing::warn!(
655                                    id = ?response.id,
656                                    "Received response for unknown request ID"
657                                );
658                            }
659                        }
660                        Err(crate::transport::traits::TransportError::Closed) => {
661                            tracing::info!("Transport closed, shutting down");
662                            break;
663                        }
664                        Err(e) => {
665                            tracing::error!(error = %e, "Failed to read message");
666                            continue;
667                        }
668                    }
669                }
670            }
671        }
672
673        // Cancel any pending requests
674        self.multiplexer.cancel_all();
675
676        // Signal cancellation to all tracked operations
677        self.cancellation_manager.cancel_all();
678
679        // Shutdown transport gracefully
680        transport.shutdown().await?;
681
682        Ok(())
683    }
684
685    /// Handle incoming JSON-RPC request
686    pub async fn handle_request(
687        &self,
688        session_id: &str,
689        request: JsonRpcRequest,
690    ) -> JsonRpcResponse {
691        // Get or create session
692        let session = self
693            .sessions
694            .entry(session_id.to_string())
695            .or_insert_with(|| {
696                let mut session = Session::with_id(session_id);
697                session.set_notification_channel(self.notification_tx.clone());
698                session
699            })
700            .clone();
701
702        // Create request context
703        let ctx = RequestContext::new(session, request.clone());
704
705        // Process through middleware chain
706        let ctx = match self.middleware.process(ctx) {
707            Ok(ctx) => ctx,
708            Err(err) => return error_response(request.id, err.to_jsonrpc()),
709        };
710
711        // Route to appropriate handler
712        let method = McpMethod::from(request.method.clone());
713
714        match method {
715            McpMethod::Initialize => self.handle_initialize(ctx).await,
716            McpMethod::Ping => self.handle_ping(ctx).await,
717            McpMethod::LoggingSetLevel => self.handle_logging_set_level(ctx).await,
718            McpMethod::ToolsList => self.handle_tools_list(ctx).await,
719            McpMethod::ToolsCall => self.handle_tools_call(ctx).await,
720            McpMethod::ResourcesList => self.handle_resources_list(ctx).await,
721            McpMethod::ResourcesTemplatesList => self.handle_resources_templates_list(ctx).await,
722            McpMethod::ResourcesRead => self.handle_resources_read(ctx).await,
723            McpMethod::ResourcesSubscribe => self.handle_resources_subscribe(ctx).await,
724            McpMethod::ResourcesUnsubscribe => self.handle_resources_unsubscribe(ctx).await,
725            McpMethod::PromptsList => self.handle_prompts_list(ctx).await,
726            McpMethod::PromptsGet => self.handle_prompts_get(ctx).await,
727            McpMethod::RootsList => self.handle_roots_list(ctx).await,
728            McpMethod::SamplingCreateMessage => self.handle_sampling_create_message(ctx).await,
729            McpMethod::ElicitationCreate => self.handle_elicitation_create(ctx).await,
730            McpMethod::TasksGet => self.handle_tasks_get(ctx).await,
731            McpMethod::TasksResult => self.handle_tasks_result(ctx).await,
732            McpMethod::TasksList => self.handle_tasks_list(ctx).await,
733            McpMethod::TasksCancel => self.handle_tasks_cancel(ctx).await,
734            McpMethod::CompletionComplete => self.handle_completion_complete(ctx).await,
735            McpMethod::NotificationsCancelled => self.handle_notifications_cancelled(ctx).await,
736            _ => error_response(
737                request.id,
738                McpError::method_not_found(&request.method).to_jsonrpc(),
739            ),
740        }
741    }
742
743    /// Handle initialize request
744    async fn handle_initialize(&self, ctx: RequestContext) -> JsonRpcResponse {
745        // Parse initialize params
746        let params = ctx.params().cloned().unwrap_or(Value::Null);
747        let req: InitializeRequest = match serde_json::from_value(params) {
748            Ok(req) => req,
749            Err(_) => {
750                return error_response(
751                    ctx.request.id,
752                    McpError::validation("invalid_params", "Invalid initialize parameters")
753                        .to_jsonrpc(),
754                );
755            }
756        };
757
758        // Negotiate protocol version
759        let protocol_version = match version::negotiate_protocol_version(&req.protocol_version) {
760            Ok(version) => version,
761            Err(supported_versions) => {
762                tracing::warn!(
763                    client = %req.client_info.name,
764                    requested = %req.protocol_version,
765                    supported = ?supported_versions,
766                    "Unsupported protocol version"
767                );
768                return error_response(
769                    ctx.request.id,
770                    McpError::builder(ErrorType::Validation, "unsupported_protocol_version")
771                        .message("Unsupported protocol version")
772                        .detail(
773                            "supported",
774                            serde_json::to_value(&supported_versions).unwrap(),
775                        )
776                        .detail("requested", req.protocol_version.clone())
777                        .build()
778                        .to_jsonrpc(),
779                );
780            }
781        };
782
783        // Log client connection
784        tracing::info!(
785            client = %req.client_info.name,
786            version = %req.client_info.version,
787            protocol = %protocol_version,
788            "Client connected"
789        );
790
791        // Initialize session
792        if let Some(mut session) = self.sessions.get_mut(&ctx.session.id) {
793            session.initialize(req.client_info, req.capabilities, protocol_version.clone());
794        }
795
796        // Build response
797        let result = InitializeResult {
798            protocol_version,
799            capabilities: (**self.capabilities.load()).clone(),
800            server_info: Implementation {
801                name: self.name.clone(),
802                version: self.version.clone(),
803            },
804            instructions: self.instructions(),
805        };
806
807        success_response(
808            ctx.request.id,
809            serde_json::to_value(result).expect("Failed to serialize InitializeResult"),
810        )
811    }
812
813    /// Handle ping request
814    async fn handle_ping(&self, ctx: RequestContext) -> JsonRpcResponse {
815        success_response(ctx.request.id, serde_json::json!({}))
816    }
817
818    /// Handle logging/setLevel request
819    async fn handle_logging_set_level(&self, ctx: RequestContext) -> JsonRpcResponse {
820        use crate::logging::LogLevel;
821        use crate::protocol::types::SetLevelRequest;
822
823        // Parse request params
824        let params = ctx.params().cloned().unwrap_or(Value::Null);
825        let req: SetLevelRequest = match serde_json::from_value(params) {
826            Ok(req) => req,
827            Err(_) => {
828                return error_response(
829                    ctx.request.id,
830                    McpError::validation("invalid_params", "Invalid setLevel parameters")
831                        .to_jsonrpc(),
832                );
833            }
834        };
835
836        // Parse log level
837        let level = match req.level.parse::<LogLevel>() {
838            Ok(level) => level,
839            Err(_) => {
840                return error_response(
841                    ctx.request.id,
842                    McpError::validation(
843                        "invalid_level",
844                        format!(
845                            "Invalid log level '{}'. Valid levels: debug, info, notice, warning, error, critical, alert, emergency",
846                            req.level
847                        ),
848                    )
849                    .to_jsonrpc(),
850                )
851            }
852        };
853
854        // Update logger min level
855        self.logger.set_min_level(level);
856
857        tracing::debug!(level = %req.level, "Log level updated");
858
859        success_response(ctx.request.id, serde_json::json!({}))
860    }
861
862    /// Handle tools/list request (with pagination support)
863    async fn handle_tools_list(&self, ctx: RequestContext) -> JsonRpcResponse {
864        if let Err(err) = require_initialization(&ctx) {
865            return error_response(ctx.request.id, err.to_jsonrpc());
866        }
867
868        // Parse cursor if present
869        let cursor = ctx
870            .params()
871            .and_then(|p| p.get("cursor"))
872            .and_then(|c| c.as_str());
873
874        let visibility_ctx = self.visibility_context(&ctx.session);
875        let all_tools = self
876            .tool_registry
877            .list_for_session(&ctx.session, &visibility_ctx);
878
879        // Apply pagination
880        let result = crate::utils::paginate(&all_tools, cursor, crate::utils::DEFAULT_PAGE_SIZE);
881
882        // Build response (only include nextCursor if present)
883        let mut response = serde_json::json!({"tools": result.items});
884        if let Some(next) = result.next_cursor {
885            response["nextCursor"] = serde_json::Value::String(next);
886        }
887        success_response(ctx.request.id, response)
888    }
889
890    /// Handle tools/call request
891    async fn handle_tools_call(&self, ctx: RequestContext) -> JsonRpcResponse {
892        if let Err(err) = require_initialization(&ctx) {
893            return error_response(ctx.request.id, err.to_jsonrpc());
894        }
895
896        // Parse tool call params
897        let params = ctx.params().cloned().unwrap_or(Value::Null);
898        let tool_name = match params.get("name").and_then(|v| v.as_str()) {
899            Some(name) => name,
900            None => {
901                return error_response(
902                    ctx.request.id,
903                    McpError::validation("invalid_params", "Missing 'name' field").to_jsonrpc(),
904                );
905            }
906        };
907
908        let tool_params = params.get("arguments").cloned().unwrap_or(Value::Null);
909
910        // Check for task-augmented execution
911        let task_meta: Option<crate::protocol::types::TaskMetadata> = params
912            .get("task")
913            .and_then(|t| serde_json::from_value(t.clone()).ok());
914
915        if let Some(task_metadata) = task_meta {
916            // Task-augmented execution
917            return self
918                .handle_task_augmented_tool_call(ctx, tool_name, tool_params, task_metadata)
919                .await;
920        }
921
922        // Regular synchronous execution
923        let client_requester = self.create_client_requester(&ctx.session.id);
924        let visibility_ctx = self.visibility_context(&ctx.session);
925
926        match self
927            .tool_registry
928            .call_for_session(
929                tool_name,
930                tool_params,
931                &ctx.session,
932                &self.logger,
933                &visibility_ctx,
934                client_requester,
935            )
936            .await
937        {
938            Ok(output) => {
939                let result = match output {
940                    crate::registry::tools::ToolOutput::Content(items) => {
941                        let content_values: Vec<Value> =
942                            items.iter().map(|c| c.to_value()).collect();
943                        serde_json::json!({"content": content_values})
944                    }
945                    crate::registry::tools::ToolOutput::Structured(value) => {
946                        serde_json::json!({"structuredContent": value})
947                    }
948                };
949                success_response(ctx.request.id, result)
950            }
951            Err(e) => error_response(ctx.request.id, tool_error_to_mcp_error(tool_name, e).to_jsonrpc()),
952        }
953    }
954
955    /// Handle task-augmented tool call
956    async fn handle_task_augmented_tool_call(
957        &self,
958        ctx: RequestContext,
959        tool_name: &str,
960        tool_params: Value,
961        task_metadata: crate::protocol::types::TaskMetadata,
962    ) -> JsonRpcResponse {
963        // Create task
964        let (task, _result_rx) =
965            self.task_store
966                .create_task(&ctx.session.id, ctx.request.clone(), task_metadata.ttl);
967
968        let task_id = task.task_id.clone();
969
970        // Spawn background execution
971        let task_store = self.task_store.clone();
972        let tool_registry = self.tool_registry.clone();
973        let logger = self.logger.clone();
974        let session = ctx.session.clone();
975        let environment = self.environment.clone();
976        let client_requester = self.create_client_requester(&ctx.session.id);
977        let tool_name = tool_name.to_string();
978
979        tokio::spawn(async move {
980            // Execute tool
981            let visibility_ctx = match environment.as_deref() {
982                Some(env) => VisibilityContext::with_environment(&session, env),
983                None => VisibilityContext::new(&session),
984            };
985
986            match tool_registry
987                .call_for_session(
988                    &tool_name,
989                    tool_params,
990                    &session,
991                    &logger,
992                    &visibility_ctx,
993                    client_requester,
994                )
995                .await
996            {
997                Ok(output) => {
998                    // Success - store result
999                    let result = match output {
1000                        crate::registry::tools::ToolOutput::Content(items) => {
1001                            let content_values: Vec<Value> =
1002                                items.iter().map(|c| c.to_value()).collect();
1003                            serde_json::json!({"content": content_values})
1004                        }
1005                        crate::registry::tools::ToolOutput::Structured(value) => {
1006                            serde_json::json!({"structuredContent": value})
1007                        }
1008                    };
1009
1010                    let _ = task_store
1011                        .update_status(
1012                            &task_id,
1013                            crate::protocol::types::TaskStatus::Completed,
1014                            None,
1015                        )
1016                        .await;
1017                    let _ = task_store.store_result(&task_id, result).await;
1018                }
1019                Err(e) => {
1020                    // Failure - store error
1021                    let error_message = e.to_string();
1022                    let _ = task_store
1023                        .update_status(
1024                            &task_id,
1025                            crate::protocol::types::TaskStatus::Failed,
1026                            Some(error_message.clone()),
1027                        )
1028                        .await;
1029
1030                    // Store error as result (for tasks/result)
1031                    let error_result = serde_json::json!({
1032                        "content": [{
1033                            "type": "text",
1034                            "text": error_message
1035                        }],
1036                        "isError": true
1037                    });
1038                    let _ = task_store.store_result(&task_id, error_result).await;
1039                }
1040            }
1041        });
1042
1043        // Return immediately with task info
1044        success_response(
1045            ctx.request.id,
1046            serde_json::to_value(crate::protocol::types::CreateTaskResult { task }).unwrap(),
1047        )
1048    }
1049
1050    /// Handle resources/list request (with pagination support)
1051    async fn handle_resources_list(&self, ctx: RequestContext) -> JsonRpcResponse {
1052        if let Err(err) = require_initialization(&ctx) {
1053            return error_response(ctx.request.id, err.to_jsonrpc());
1054        }
1055
1056        // Parse cursor if present
1057        let cursor = ctx
1058            .params()
1059            .and_then(|p| p.get("cursor"))
1060            .and_then(|c| c.as_str());
1061
1062        let visibility_ctx = self.visibility_context(&ctx.session);
1063        let all_resources = self
1064            .resource_manager
1065            .list_for_session(&ctx.session, &visibility_ctx);
1066
1067        // Apply pagination
1068        let result = crate::utils::paginate(&all_resources, cursor, crate::utils::DEFAULT_PAGE_SIZE);
1069
1070        // Build response (only include nextCursor if present)
1071        let mut response = serde_json::json!({"resources": result.items});
1072        if let Some(next) = result.next_cursor {
1073            response["nextCursor"] = serde_json::Value::String(next);
1074        }
1075        success_response(ctx.request.id, response)
1076    }
1077
1078    /// Handle resources/templates/list request (with pagination support)
1079    async fn handle_resources_templates_list(&self, ctx: RequestContext) -> JsonRpcResponse {
1080        if let Err(err) = require_initialization(&ctx) {
1081            return error_response(ctx.request.id, err.to_jsonrpc());
1082        }
1083
1084        // Parse cursor if present
1085        let cursor = ctx
1086            .params()
1087            .and_then(|p| p.get("cursor"))
1088            .and_then(|c| c.as_str());
1089
1090        let visibility_ctx = self.visibility_context(&ctx.session);
1091        let all_templates = self
1092            .resource_manager
1093            .list_templates_for_session(&ctx.session, &visibility_ctx);
1094
1095        // Apply pagination
1096        let result = crate::utils::paginate(&all_templates, cursor, crate::utils::DEFAULT_PAGE_SIZE);
1097
1098        // Build response (only include nextCursor if present)
1099        let mut response = serde_json::json!({"resourceTemplates": result.items});
1100        if let Some(next) = result.next_cursor {
1101            response["nextCursor"] = serde_json::Value::String(next);
1102        }
1103        success_response(ctx.request.id, response)
1104    }
1105
1106    /// Handle resources/read request
1107    async fn handle_resources_read(&self, ctx: RequestContext) -> JsonRpcResponse {
1108        if let Err(err) = require_initialization(&ctx) {
1109            return error_response(ctx.request.id, err.to_jsonrpc());
1110        }
1111
1112        // Parse resource read params
1113        let params = ctx.params().cloned().unwrap_or(Value::Null);
1114        let uri = match params.get("uri").and_then(|v| v.as_str()) {
1115            Some(uri) => uri,
1116            None => {
1117                return error_response(
1118                    ctx.request.id,
1119                    McpError::validation("invalid_params", "Missing 'uri' field").to_jsonrpc(),
1120                );
1121            }
1122        };
1123
1124        // Read resource - ResourceContent already includes uri/mimeType
1125        match self
1126            .resource_manager
1127            .read(
1128                uri,
1129                std::collections::HashMap::new(),
1130                &ctx.session,
1131                &self.logger,
1132            )
1133            .await
1134        {
1135            Ok(contents) => {
1136                // ResourceContent.to_value() already produces MCP-compliant JSON with uri/mimeType
1137                let content_values: Vec<Value> = contents.iter().map(|c| c.to_value()).collect();
1138                success_response(
1139                    ctx.request.id,
1140                    serde_json::json!({"contents": content_values}),
1141                )
1142            }
1143            Err(e) => error_response(
1144                ctx.request.id,
1145                McpError::internal("resource_read_failed", e.to_string()).to_jsonrpc(),
1146            ),
1147        }
1148    }
1149
1150    /// Handle resources/subscribe request
1151    ///
1152    /// Subscribes the current session to receive notifications when a resource changes.
1153    /// When the resource is updated, the server will send a `notifications/resources/updated`
1154    /// notification to this session.
1155    async fn handle_resources_subscribe(&self, ctx: RequestContext) -> JsonRpcResponse {
1156        if let Err(err) = require_initialization(&ctx) {
1157            return error_response(ctx.request.id, err.to_jsonrpc());
1158        }
1159
1160        // Check if resources.subscribe capability is advertised
1161        let caps = self.capabilities.load();
1162        if caps.resources.is_none()
1163            || caps.resources.as_ref().and_then(|r| r.subscribe) != Some(true)
1164        {
1165            return error_response(
1166                ctx.request.id,
1167                McpError::validation(
1168                    "capability_not_supported",
1169                    "Resource subscriptions are not enabled on this server",
1170                )
1171                .to_jsonrpc(),
1172            );
1173        }
1174
1175        // Parse subscribe params
1176        let params = ctx.params().cloned().unwrap_or(Value::Null);
1177        let req: SubscribeRequest = match serde_json::from_value(params) {
1178            Ok(req) => req,
1179            Err(_) => {
1180                return error_response(
1181                    ctx.request.id,
1182                    McpError::validation("invalid_params", "Missing or invalid 'uri' field")
1183                        .to_jsonrpc(),
1184                );
1185            }
1186        };
1187
1188        // Subscribe the session to the resource URI
1189        self.subscription_manager.subscribe(&ctx.session.id, &req.uri);
1190
1191        tracing::debug!(
1192            session_id = %ctx.session.id,
1193            uri = %req.uri,
1194            "Session subscribed to resource"
1195        );
1196
1197        success_response(ctx.request.id, serde_json::json!({}))
1198    }
1199
1200    /// Handle resources/unsubscribe request
1201    ///
1202    /// Unsubscribes the current session from receiving notifications for a resource.
1203    async fn handle_resources_unsubscribe(&self, ctx: RequestContext) -> JsonRpcResponse {
1204        if let Err(err) = require_initialization(&ctx) {
1205            return error_response(ctx.request.id, err.to_jsonrpc());
1206        }
1207
1208        // Parse unsubscribe params
1209        let params = ctx.params().cloned().unwrap_or(Value::Null);
1210        let req: UnsubscribeRequest = match serde_json::from_value(params) {
1211            Ok(req) => req,
1212            Err(_) => {
1213                return error_response(
1214                    ctx.request.id,
1215                    McpError::validation("invalid_params", "Missing or invalid 'uri' field")
1216                        .to_jsonrpc(),
1217                );
1218            }
1219        };
1220
1221        // Unsubscribe the session from the resource URI
1222        self.subscription_manager
1223            .unsubscribe(&ctx.session.id, &req.uri);
1224
1225        tracing::debug!(
1226            session_id = %ctx.session.id,
1227            uri = %req.uri,
1228            "Session unsubscribed from resource"
1229        );
1230
1231        success_response(ctx.request.id, serde_json::json!({}))
1232    }
1233
1234    /// Handle prompts/list request (with pagination support)
1235    async fn handle_prompts_list(&self, ctx: RequestContext) -> JsonRpcResponse {
1236        if let Err(err) = require_initialization(&ctx) {
1237            return error_response(ctx.request.id, err.to_jsonrpc());
1238        }
1239
1240        // Parse cursor if present
1241        let cursor = ctx
1242            .params()
1243            .and_then(|p| p.get("cursor"))
1244            .and_then(|c| c.as_str());
1245
1246        let visibility_ctx = self.visibility_context(&ctx.session);
1247        let all_prompts = self
1248            .prompt_manager
1249            .list_for_session(&ctx.session, &visibility_ctx);
1250
1251        // Apply pagination
1252        let result = crate::utils::paginate(&all_prompts, cursor, crate::utils::DEFAULT_PAGE_SIZE);
1253
1254        // Build response (only include nextCursor if present)
1255        let mut response = serde_json::json!({"prompts": result.items});
1256        if let Some(next) = result.next_cursor {
1257            response["nextCursor"] = serde_json::Value::String(next);
1258        }
1259        success_response(ctx.request.id, response)
1260    }
1261
1262    /// Handle prompts/get request
1263    async fn handle_prompts_get(&self, ctx: RequestContext) -> JsonRpcResponse {
1264        if let Err(err) = require_initialization(&ctx) {
1265            return error_response(ctx.request.id, err.to_jsonrpc());
1266        }
1267
1268        // Parse prompt get params
1269        let params = ctx.params().cloned().unwrap_or(Value::Null);
1270        let prompt_name = match params.get("name").and_then(|v| v.as_str()) {
1271            Some(name) => name,
1272            None => {
1273                return error_response(
1274                    ctx.request.id,
1275                    McpError::validation("invalid_params", "Missing 'name' field").to_jsonrpc(),
1276                );
1277            }
1278        };
1279
1280        let prompt_params = params.get("arguments").cloned().unwrap_or(Value::Null);
1281
1282        // Call prompt
1283        match self
1284            .prompt_manager
1285            .call(prompt_name, prompt_params, &ctx.session, &self.logger)
1286            .await
1287        {
1288            Ok(result) => success_response(
1289                ctx.request.id,
1290                serde_json::to_value(result).expect("Failed to serialize prompt result"),
1291            ),
1292            Err(e) => error_response(
1293                ctx.request.id,
1294                McpError::internal("prompt_get_failed", e.to_string()).to_jsonrpc(),
1295            ),
1296        }
1297    }
1298
1299    /// Handle roots/list request
1300    ///
1301    /// NOTE: This is a CLIENT capability - the server requests roots FROM the client.
1302    /// This handler would only be used if mcphost-rs is acting as a client.
1303    /// For server→client root requests, use `request_roots()` instead.
1304    async fn handle_roots_list(&self, ctx: RequestContext) -> JsonRpcResponse {
1305        if let Err(err) = require_initialization(&ctx) {
1306            return error_response(ctx.request.id, err.to_jsonrpc());
1307        }
1308
1309        // If mcphost-rs is acting as a server (normal case), we don't have roots to provide
1310        // The client provides roots TO us, not the other way around
1311        // Return empty list for now
1312        use crate::protocol::types::ListRootsResult;
1313
1314        let result = ListRootsResult { roots: vec![] };
1315
1316        success_response(
1317            ctx.request.id,
1318            serde_json::to_value(result).expect("Failed to serialize roots list"),
1319        )
1320    }
1321
1322    /// Handle sampling/createMessage request
1323    ///
1324    /// NOTE: This is a CLIENT capability - the server requests LLM completions FROM the client.
1325    /// This is for when the MCP server needs the client to generate LLM responses.
1326    /// This is NOT for the server to generate responses itself.
1327    async fn handle_sampling_create_message(&self, ctx: RequestContext) -> JsonRpcResponse {
1328        if let Err(err) = require_initialization(&ctx) {
1329            return error_response(ctx.request.id, err.to_jsonrpc());
1330        }
1331
1332        // Servers don't generate LLM responses, clients do
1333        // This would only be used if mcphost-rs acts as a client
1334        error_response(
1335            ctx.request.id,
1336            McpError::not_implemented(
1337                "sampling/createMessage is a client capability. Use ClientRequester.create_message() for server→client requests."
1338            ).to_jsonrpc(),
1339        )
1340    }
1341
1342    /// Handle elicitation/create request
1343    ///
1344    /// NOTE: This is a CLIENT capability - the server requests user input FROM the client.
1345    /// This is for when the MCP server needs the client to prompt the user for structured input.
1346    async fn handle_elicitation_create(&self, ctx: RequestContext) -> JsonRpcResponse {
1347        if let Err(err) = require_initialization(&ctx) {
1348            return error_response(ctx.request.id, err.to_jsonrpc());
1349        }
1350
1351        // Servers don't prompt users directly, clients do
1352        // This would only be used if mcphost-rs acts as a client
1353        error_response(
1354            ctx.request.id,
1355            McpError::not_implemented(
1356                "elicitation/create is a client capability. Use ClientRequester.create_elicitation() for server→client requests."
1357            ).to_jsonrpc(),
1358        )
1359    }
1360
1361    /// Handle tasks/get request
1362    async fn handle_tasks_get(&self, ctx: RequestContext) -> JsonRpcResponse {
1363        if let Err(err) = require_initialization(&ctx) {
1364            return error_response(ctx.request.id, err.to_jsonrpc());
1365        }
1366
1367        let params: crate::protocol::types::GetTaskParams = match ctx.params() {
1368            Some(p) => match serde_json::from_value(p.clone()) {
1369                Ok(params) => params,
1370                Err(_) => {
1371                    return error_response(
1372                        ctx.request.id,
1373                        McpError::validation("invalid_params", "Missing or invalid taskId")
1374                            .to_jsonrpc(),
1375                    );
1376                }
1377            },
1378            None => {
1379                return error_response(
1380                    ctx.request.id,
1381                    McpError::validation("invalid_params", "Missing taskId parameter").to_jsonrpc(),
1382                );
1383            }
1384        };
1385
1386        match self
1387            .task_store
1388            .get_task_for_session(&params.task_id, &ctx.session.id)
1389            .await
1390        {
1391            Some(task) => success_response(ctx.request.id, serde_json::to_value(task).unwrap()),
1392            None => error_response(
1393                ctx.request.id,
1394                McpError::validation("invalid_params", "Task not found").to_jsonrpc(),
1395            ),
1396        }
1397    }
1398
1399    /// Handle tasks/result request (blocks until terminal state)
1400    async fn handle_tasks_result(&self, ctx: RequestContext) -> JsonRpcResponse {
1401        if let Err(err) = require_initialization(&ctx) {
1402            return error_response(ctx.request.id, err.to_jsonrpc());
1403        }
1404
1405        let params: crate::protocol::types::GetTaskParams = match ctx.params() {
1406            Some(p) => match serde_json::from_value(p.clone()) {
1407                Ok(params) => params,
1408                Err(_) => {
1409                    return error_response(
1410                        ctx.request.id,
1411                        McpError::validation("invalid_params", "Missing or invalid taskId")
1412                            .to_jsonrpc(),
1413                    );
1414                }
1415            },
1416            None => {
1417                return error_response(
1418                    ctx.request.id,
1419                    McpError::validation("invalid_params", "Missing taskId parameter").to_jsonrpc(),
1420                );
1421            }
1422        };
1423
1424        // Verify session owns task
1425        if self
1426            .task_store
1427            .get_task_for_session(&params.task_id, &ctx.session.id)
1428            .await
1429            .is_none()
1430        {
1431            return error_response(
1432                ctx.request.id,
1433                McpError::validation("invalid_params", "Task not found").to_jsonrpc(),
1434            );
1435        }
1436
1437        // Wait for result (5-minute timeout)
1438        match self
1439            .task_store
1440            .wait_for_result(&params.task_id, std::time::Duration::from_secs(300))
1441            .await
1442        {
1443            Ok(result) => success_response(ctx.request.id, result),
1444            Err(e) => error_response(
1445                ctx.request.id,
1446                McpError::internal("task_error", e.to_string()).to_jsonrpc(),
1447            ),
1448        }
1449    }
1450
1451    /// Handle tasks/list request
1452    async fn handle_tasks_list(&self, ctx: RequestContext) -> JsonRpcResponse {
1453        if let Err(err) = require_initialization(&ctx) {
1454            return error_response(ctx.request.id, err.to_jsonrpc());
1455        }
1456
1457        // Parse cursor if present
1458        let cursor = ctx
1459            .params()
1460            .and_then(|p| p.get("cursor"))
1461            .and_then(|c| c.as_str());
1462
1463        let (tasks, next_cursor) = self
1464            .task_store
1465            .list_tasks(&ctx.session.id, cursor, 100)
1466            .await;
1467
1468        success_response(
1469            ctx.request.id,
1470            serde_json::json!({
1471                "tasks": tasks,
1472                "nextCursor": next_cursor,
1473            }),
1474        )
1475    }
1476
1477    /// Handle tasks/cancel request
1478    async fn handle_tasks_cancel(&self, ctx: RequestContext) -> JsonRpcResponse {
1479        if let Err(err) = require_initialization(&ctx) {
1480            return error_response(ctx.request.id, err.to_jsonrpc());
1481        }
1482
1483        let params: crate::protocol::types::CancelTaskParams = match ctx.params() {
1484            Some(p) => match serde_json::from_value(p.clone()) {
1485                Ok(params) => params,
1486                Err(_) => {
1487                    return error_response(
1488                        ctx.request.id,
1489                        McpError::validation("invalid_params", "Missing or invalid taskId")
1490                            .to_jsonrpc(),
1491                    );
1492                }
1493            },
1494            None => {
1495                return error_response(
1496                    ctx.request.id,
1497                    McpError::validation("invalid_params", "Missing taskId parameter").to_jsonrpc(),
1498                );
1499            }
1500        };
1501
1502        match self
1503            .task_store
1504            .cancel_task(&params.task_id, &ctx.session.id)
1505            .await
1506        {
1507            Ok(task) => success_response(ctx.request.id, serde_json::to_value(task).unwrap()),
1508            Err(e) => {
1509                let error_msg = match e {
1510                    crate::managers::task::TaskError::NotFound(_) => {
1511                        McpError::validation("invalid_params", "Task not found")
1512                    }
1513                    crate::managers::task::TaskError::AlreadyTerminal(status) => {
1514                        McpError::validation(
1515                            "invalid_params",
1516                            format!(
1517                                "Cannot cancel task: already in terminal status '{:?}'",
1518                                status
1519                            ),
1520                        )
1521                    }
1522                    _ => McpError::internal("task_error", e.to_string()),
1523                };
1524                error_response(ctx.request.id, error_msg.to_jsonrpc())
1525            }
1526        }
1527    }
1528
1529    /// Handle completion/complete request
1530    ///
1531    /// Provides argument value completion suggestions for prompts and resources.
1532    /// This allows clients to get autocomplete suggestions while typing arguments.
1533    async fn handle_completion_complete(&self, ctx: RequestContext) -> JsonRpcResponse {
1534        if let Err(err) = require_initialization(&ctx) {
1535            return error_response(ctx.request.id, err.to_jsonrpc());
1536        }
1537
1538        // Check if server has completion capability
1539        let caps = self.capabilities.load();
1540        if caps.completion.is_none() {
1541            return error_response(
1542                ctx.request.id,
1543                McpError::not_implemented("Completion capability not enabled").to_jsonrpc(),
1544            );
1545        }
1546
1547        // Parse completion request
1548        let params = ctx.params().cloned().unwrap_or(Value::Null);
1549        let request: crate::protocol::types::CompleteRequest = match serde_json::from_value(params)
1550        {
1551            Ok(req) => req,
1552            Err(e) => {
1553                return error_response(
1554                    ctx.request.id,
1555                    McpError::validation(
1556                        "invalid_params",
1557                        format!("Invalid completion request: {}", e),
1558                    )
1559                    .to_jsonrpc(),
1560                );
1561            }
1562        };
1563
1564        // Execute completion
1565        match self.completion_manager.complete(request).await {
1566            Ok(result) => success_response(
1567                ctx.request.id,
1568                serde_json::to_value(result).expect("Failed to serialize completion result"),
1569            ),
1570            Err(e) => error_response(
1571                ctx.request.id,
1572                McpError::internal("completion_failed", e.to_string()).to_jsonrpc(),
1573            ),
1574        }
1575    }
1576
1577    /// Handle notifications/cancelled notification
1578    ///
1579    /// This is a notification (no response expected). It signals that the client
1580    /// wants to cancel a previously submitted request. We look up the request ID
1581    /// and signal cancellation to any registered handlers.
1582    async fn handle_notifications_cancelled(&self, ctx: RequestContext) -> JsonRpcResponse {
1583        // Parse cancellation params
1584        let params: CancelledNotificationParams = match ctx.params() {
1585            Some(p) => match serde_json::from_value(p.clone()) {
1586                Ok(params) => params,
1587                Err(e) => {
1588                    tracing::warn!(error = %e, "Invalid notifications/cancelled params");
1589                    // Notifications don't get error responses, just return empty
1590                    return success_response(ctx.request.id, serde_json::json!({}));
1591                }
1592            },
1593            None => {
1594                tracing::warn!("notifications/cancelled missing params");
1595                return success_response(ctx.request.id, serde_json::json!({}));
1596            }
1597        };
1598
1599        let request_id = params.request_id_string();
1600        tracing::debug!(
1601            request_id = %request_id,
1602            reason = ?params.reason,
1603            "Received cancellation request"
1604        );
1605
1606        // Signal cancellation to any registered handlers
1607        self.cancellation_manager.cancel(&request_id, params.reason);
1608
1609        // Notifications don't get responses, but we return an empty success
1610        // in case the run loop tries to write it (it won't for notifications)
1611        success_response(ctx.request.id, serde_json::json!({}))
1612    }
1613}
1614
1615#[cfg(test)]
1616mod tests {
1617    use super::*;
1618
1619    #[tokio::test]
1620    async fn test_server_creation() {
1621        let server = Server::new("test-server", "1.0.0");
1622        assert_eq!(server.name(), "test-server");
1623        assert_eq!(server.version(), "1.0.0");
1624    }
1625
1626    #[tokio::test]
1627    async fn test_ping() {
1628        let server = Server::new("test-server", "1.0.0");
1629
1630        let request = JsonRpcRequest {
1631            jsonrpc: "2.0".to_string(),
1632            id: Some(Value::Number(1.into())),
1633            method: "ping".to_string(),
1634            params: None,
1635        };
1636
1637        let response = server.handle_request("test-session", request).await;
1638
1639        assert!(response.result.is_some());
1640        assert!(response.error.is_none());
1641    }
1642
1643    #[tokio::test]
1644    async fn test_initialize() {
1645        let server = Server::new("test-server", "1.0.0");
1646
1647        let request = JsonRpcRequest {
1648            jsonrpc: "2.0".to_string(),
1649            id: Some(Value::Number(1.into())),
1650            method: "initialize".to_string(),
1651            params: Some(serde_json::json!({
1652                "protocolVersion": "2025-11-25",
1653                "capabilities": {},
1654                "clientInfo": {
1655                    "name": "test-client",
1656                    "version": "1.0.0"
1657                }
1658            })),
1659        };
1660
1661        let response = server.handle_request("test-session", request).await;
1662
1663        assert!(response.result.is_some());
1664        assert!(response.error.is_none());
1665
1666        // Check session was initialized
1667        let session = server.get_session("test-session").unwrap();
1668        assert!(session.is_initialized());
1669        assert_eq!(session.client_info.unwrap().name, "test-client");
1670    }
1671
1672    #[tokio::test]
1673    async fn test_method_not_found() {
1674        let server = Server::new("test-server", "1.0.0");
1675
1676        let request = JsonRpcRequest {
1677            jsonrpc: "2.0".to_string(),
1678            id: Some(Value::Number(1.into())),
1679            method: "unknown/method".to_string(),
1680            params: None,
1681        };
1682
1683        let response = server.handle_request("test-session", request).await;
1684
1685        assert!(response.result.is_none());
1686        assert!(response.error.is_some());
1687        assert_eq!(response.error.unwrap().code, -32601);
1688    }
1689
1690    #[tokio::test]
1691    async fn test_requires_initialization() {
1692        let server = Server::new("test-server", "1.0.0");
1693
1694        let request = JsonRpcRequest {
1695            jsonrpc: "2.0".to_string(),
1696            id: Some(Value::Number(1.into())),
1697            method: "tools/list".to_string(),
1698            params: None,
1699        };
1700
1701        // Should fail without initialization
1702        let response = server.handle_request("test-session", request.clone()).await;
1703        assert!(response.error.is_some());
1704
1705        // Initialize session
1706        let init_request = JsonRpcRequest {
1707            jsonrpc: "2.0".to_string(),
1708            id: Some(Value::Number(2.into())),
1709            method: "initialize".to_string(),
1710            params: Some(serde_json::json!({
1711                "protocolVersion": "2025-11-25",
1712                "capabilities": {},
1713                "clientInfo": {
1714                    "name": "test-client",
1715                    "version": "1.0.0"
1716                }
1717            })),
1718        };
1719        server.handle_request("test-session", init_request).await;
1720
1721        // Should succeed after initialization
1722        let response = server.handle_request("test-session", request).await;
1723        assert!(response.result.is_some());
1724    }
1725
1726    #[tokio::test]
1727    async fn test_session_management() {
1728        let server = Server::new("test-server", "1.0.0");
1729
1730        // Create session
1731        let request = JsonRpcRequest {
1732            jsonrpc: "2.0".to_string(),
1733            id: Some(Value::Number(1.into())),
1734            method: "ping".to_string(),
1735            params: None,
1736        };
1737        server.handle_request("session-1", request).await;
1738
1739        // Session should exist
1740        assert!(server.get_session("session-1").is_some());
1741
1742        // Remove session
1743        let removed = server.remove_session("session-1");
1744        assert!(removed.is_some());
1745
1746        // Session should no longer exist
1747        assert!(server.get_session("session-1").is_none());
1748    }
1749
1750    #[tokio::test]
1751    async fn test_capabilities_update() {
1752        let server = Server::new("test-server", "1.0.0");
1753
1754        let caps = ServerCapabilities {
1755            tools: Some(crate::protocol::capabilities::ToolsCapability {
1756                list_changed: Some(true),
1757            }),
1758            ..Default::default()
1759        };
1760
1761        server.set_capabilities(caps.clone());
1762
1763        let loaded_caps = server.capabilities();
1764        assert_eq!(loaded_caps.tools, caps.tools);
1765    }
1766
1767    // ========================================================================
1768    // Task Integration Tests
1769    // ========================================================================
1770
1771    /// Helper: Initialize a test session
1772    async fn init_test_session(server: &Server, session_id: &str) {
1773        let request = JsonRpcRequest {
1774            jsonrpc: "2.0".to_string(),
1775            id: Some(Value::Number(1.into())),
1776            method: "initialize".to_string(),
1777            params: Some(serde_json::json!({
1778                "protocolVersion": "2025-11-25",
1779                "capabilities": {
1780                    "tasks": {
1781                        "list": {},
1782                        "cancel": {},
1783                        "requests": {
1784                            "tools": {
1785                                "call": {}
1786                            }
1787                        }
1788                    }
1789                },
1790                "clientInfo": {
1791                    "name": "test-client",
1792                    "version": "1.0.0"
1793                }
1794            })),
1795        };
1796
1797        server.handle_request(session_id, request).await;
1798    }
1799
1800    /// Test tool that completes immediately
1801    struct TestTaskTool;
1802
1803    #[async_trait::async_trait]
1804    impl crate::registry::tools::Tool for TestTaskTool {
1805        fn name(&self) -> &str {
1806            "test_task"
1807        }
1808
1809        fn description(&self) -> Option<&str> {
1810            Some("Test tool for task execution")
1811        }
1812
1813        fn input_schema(&self) -> Value {
1814            serde_json::json!({
1815                "type": "object",
1816                "properties": {
1817                    "message": {"type": "string"}
1818                }
1819            })
1820        }
1821
1822        fn execution(&self) -> Option<crate::protocol::types::ToolExecution> {
1823            Some(crate::protocol::types::ToolExecution {
1824                task_support: Some(crate::protocol::types::TaskSupport::Optional),
1825            })
1826        }
1827
1828        async fn execute(
1829            &self,
1830            ctx: crate::prelude::ExecutionContext<'_>,
1831        ) -> Result<crate::registry::tools::ToolOutput, crate::registry::tools::ToolError> {
1832            let msg = ctx
1833                .params
1834                .get("message")
1835                .and_then(|v| v.as_str())
1836                .unwrap_or("default");
1837
1838            Ok(crate::registry::tools::ToolOutput::text(format!(
1839                "Processed: {}",
1840                msg
1841            )))
1842        }
1843    }
1844
1845    /// Test tool that takes time to complete
1846    struct SlowTestTool;
1847
1848    #[async_trait::async_trait]
1849    impl crate::registry::tools::Tool for SlowTestTool {
1850        fn name(&self) -> &str {
1851            "slow_test"
1852        }
1853
1854        fn description(&self) -> Option<&str> {
1855            Some("Slow test tool")
1856        }
1857
1858        fn input_schema(&self) -> Value {
1859            serde_json::json!({"type": "object"})
1860        }
1861
1862        fn execution(&self) -> Option<crate::protocol::types::ToolExecution> {
1863            Some(crate::protocol::types::ToolExecution {
1864                task_support: Some(crate::protocol::types::TaskSupport::Optional),
1865            })
1866        }
1867
1868        async fn execute(
1869            &self,
1870            _ctx: crate::prelude::ExecutionContext<'_>,
1871        ) -> Result<crate::registry::tools::ToolOutput, crate::registry::tools::ToolError> {
1872            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1873            Ok(crate::registry::tools::ToolOutput::text(
1874                "Slow operation complete",
1875            ))
1876        }
1877    }
1878
1879    #[tokio::test]
1880    async fn test_task_augmented_tool_call() {
1881        let server = Server::new("test-server", "1.0.0");
1882        server.tool_registry().register(TestTaskTool);
1883
1884        init_test_session(&server, "test-session").await;
1885
1886        // Call tool with task metadata
1887        let request = JsonRpcRequest {
1888            jsonrpc: "2.0".to_string(),
1889            id: Some(Value::Number(2.into())),
1890            method: "tools/call".to_string(),
1891            params: Some(serde_json::json!({
1892                "name": "test_task",
1893                "arguments": {"message": "hello"},
1894                "task": {"ttl": 60000}
1895            })),
1896        };
1897
1898        let response = server.handle_request("test-session", request).await;
1899
1900        // Should get CreateTaskResult with task metadata
1901        assert!(response.result.is_some());
1902        assert!(response.error.is_none());
1903
1904        let result = response.result.unwrap();
1905        assert!(result.get("task").is_some());
1906
1907        let task = result.get("task").unwrap();
1908        assert!(task.get("taskId").is_some());
1909        assert_eq!(task.get("status").unwrap().as_str().unwrap(), "working");
1910        assert!(task.get("createdAt").is_some());
1911        assert_eq!(task.get("ttl").unwrap().as_u64().unwrap(), 60000);
1912    }
1913
1914    #[tokio::test]
1915    async fn test_task_get_status() {
1916        let server = Server::new("test-server", "1.0.0");
1917        server.tool_registry().register(SlowTestTool);
1918
1919        init_test_session(&server, "test-session").await;
1920
1921        // Create task
1922        let create_request = JsonRpcRequest {
1923            jsonrpc: "2.0".to_string(),
1924            id: Some(Value::Number(2.into())),
1925            method: "tools/call".to_string(),
1926            params: Some(serde_json::json!({
1927                "name": "slow_test",
1928                "arguments": {},
1929                "task": {"ttl": 60000}
1930            })),
1931        };
1932
1933        let create_response = server.handle_request("test-session", create_request).await;
1934        let task_id = create_response.result.unwrap()["task"]["taskId"]
1935            .as_str()
1936            .unwrap()
1937            .to_string();
1938
1939        // Get task status immediately (should be working)
1940        let get_request = JsonRpcRequest {
1941            jsonrpc: "2.0".to_string(),
1942            id: Some(Value::Number(3.into())),
1943            method: "tasks/get".to_string(),
1944            params: Some(serde_json::json!({"taskId": task_id})),
1945        };
1946
1947        let get_response = server.handle_request("test-session", get_request).await;
1948
1949        assert!(get_response.result.is_some());
1950        let result = get_response.result.unwrap();
1951        let status = result["status"].as_str().unwrap();
1952        assert!(status == "working" || status == "completed");
1953    }
1954
1955    #[tokio::test]
1956    async fn test_task_result_blocking() {
1957        let server = Server::new("test-server", "1.0.0");
1958        server.tool_registry().register(SlowTestTool);
1959
1960        init_test_session(&server, "test-session").await;
1961
1962        // Create task
1963        let create_request = JsonRpcRequest {
1964            jsonrpc: "2.0".to_string(),
1965            id: Some(Value::Number(2.into())),
1966            method: "tools/call".to_string(),
1967            params: Some(serde_json::json!({
1968                "name": "slow_test",
1969                "arguments": {},
1970                "task": {"ttl": 60000}
1971            })),
1972        };
1973
1974        let create_response = server.handle_request("test-session", create_request).await;
1975        let task_id = create_response.result.unwrap()["task"]["taskId"]
1976            .as_str()
1977            .unwrap()
1978            .to_string();
1979
1980        // Get result (should block until complete)
1981        let result_request = JsonRpcRequest {
1982            jsonrpc: "2.0".to_string(),
1983            id: Some(Value::Number(3.into())),
1984            method: "tasks/result".to_string(),
1985            params: Some(serde_json::json!({"taskId": task_id})),
1986        };
1987
1988        let result_response = server.handle_request("test-session", result_request).await;
1989
1990        assert!(result_response.result.is_some());
1991        assert!(result_response.error.is_none());
1992
1993        // Should have tool result
1994        let result = result_response.result.unwrap();
1995        assert!(result.get("content").is_some());
1996    }
1997
1998    #[tokio::test]
1999    async fn test_task_cancel() {
2000        let server = Server::new("test-server", "1.0.0");
2001        server.tool_registry().register(SlowTestTool);
2002
2003        init_test_session(&server, "test-session").await;
2004
2005        // Create task
2006        let create_request = JsonRpcRequest {
2007            jsonrpc: "2.0".to_string(),
2008            id: Some(Value::Number(2.into())),
2009            method: "tools/call".to_string(),
2010            params: Some(serde_json::json!({
2011                "name": "slow_test",
2012                "arguments": {},
2013                "task": {"ttl": 60000}
2014            })),
2015        };
2016
2017        let create_response = server.handle_request("test-session", create_request).await;
2018        let task_id = create_response.result.unwrap()["task"]["taskId"]
2019            .as_str()
2020            .unwrap()
2021            .to_string();
2022
2023        // Cancel immediately
2024        let cancel_request = JsonRpcRequest {
2025            jsonrpc: "2.0".to_string(),
2026            id: Some(Value::Number(3.into())),
2027            method: "tasks/cancel".to_string(),
2028            params: Some(serde_json::json!({"taskId": task_id})),
2029        };
2030
2031        let cancel_response = server.handle_request("test-session", cancel_request).await;
2032
2033        // Should succeed if still working
2034        if cancel_response.result.is_some() {
2035            let result = cancel_response.result.unwrap();
2036            let status = result["status"].as_str().unwrap();
2037            assert_eq!(status, "cancelled");
2038        }
2039        // Or fail if already completed (timing-dependent)
2040    }
2041
2042    #[tokio::test]
2043    async fn test_task_list() {
2044        let server = Server::new("test-server", "1.0.0");
2045        server.tool_registry().register(TestTaskTool);
2046
2047        init_test_session(&server, "test-session").await;
2048
2049        // Create multiple tasks
2050        for i in 0..3 {
2051            let request = JsonRpcRequest {
2052                jsonrpc: "2.0".to_string(),
2053                id: Some(Value::Number((i + 2).into())),
2054                method: "tools/call".to_string(),
2055                params: Some(serde_json::json!({
2056                    "name": "test_task",
2057                    "arguments": {"message": format!("task-{}", i)},
2058                    "task": {"ttl": 60000}
2059                })),
2060            };
2061            server.handle_request("test-session", request).await;
2062        }
2063
2064        // List tasks
2065        let list_request = JsonRpcRequest {
2066            jsonrpc: "2.0".to_string(),
2067            id: Some(Value::Number(10.into())),
2068            method: "tasks/list".to_string(),
2069            params: None,
2070        };
2071
2072        let list_response = server.handle_request("test-session", list_request).await;
2073
2074        assert!(list_response.result.is_some());
2075        let result = list_response.result.unwrap();
2076        let tasks = result["tasks"].as_array().unwrap();
2077        assert!(tasks.len() >= 3);
2078    }
2079
2080    #[tokio::test]
2081    async fn test_task_session_isolation() {
2082        let server = Server::new("test-server", "1.0.0");
2083        server.tool_registry().register(TestTaskTool);
2084
2085        init_test_session(&server, "session-1").await;
2086        init_test_session(&server, "session-2").await;
2087
2088        // Create task in session-1
2089        let request = JsonRpcRequest {
2090            jsonrpc: "2.0".to_string(),
2091            id: Some(Value::Number(2.into())),
2092            method: "tools/call".to_string(),
2093            params: Some(serde_json::json!({
2094                "name": "test_task",
2095                "arguments": {"message": "private"},
2096                "task": {"ttl": 60000}
2097            })),
2098        };
2099
2100        let response = server.handle_request("session-1", request).await;
2101        let task_id = response.result.unwrap()["task"]["taskId"]
2102            .as_str()
2103            .unwrap()
2104            .to_string();
2105
2106        // Try to access from session-2 (should fail)
2107        let get_request = JsonRpcRequest {
2108            jsonrpc: "2.0".to_string(),
2109            id: Some(Value::Number(3.into())),
2110            method: "tasks/get".to_string(),
2111            params: Some(serde_json::json!({"taskId": task_id})),
2112        };
2113
2114        let get_response = server.handle_request("session-2", get_request).await;
2115
2116        // Should return error (task not found for this session)
2117        assert!(get_response.error.is_some());
2118    }
2119
2120    #[tokio::test]
2121    async fn test_task_not_found() {
2122        let server = Server::new("test-server", "1.0.0");
2123        init_test_session(&server, "test-session").await;
2124
2125        let request = JsonRpcRequest {
2126            jsonrpc: "2.0".to_string(),
2127            id: Some(Value::Number(2.into())),
2128            method: "tasks/get".to_string(),
2129            params: Some(serde_json::json!({"taskId": "nonexistent-task-id"})),
2130        };
2131
2132        let response = server.handle_request("test-session", request).await;
2133
2134        assert!(response.error.is_some());
2135        assert_eq!(response.error.unwrap().code, -32602);
2136    }
2137
2138    #[tokio::test]
2139    async fn test_task_double_cancel() {
2140        let server = Server::new("test-server", "1.0.0");
2141        server.tool_registry().register(SlowTestTool);
2142
2143        init_test_session(&server, "test-session").await;
2144
2145        // Create task
2146        let create_request = JsonRpcRequest {
2147            jsonrpc: "2.0".to_string(),
2148            id: Some(Value::Number(2.into())),
2149            method: "tools/call".to_string(),
2150            params: Some(serde_json::json!({
2151                "name": "slow_test",
2152                "arguments": {},
2153                "task": {"ttl": 60000}
2154            })),
2155        };
2156
2157        let create_response = server.handle_request("test-session", create_request).await;
2158        let task_id = create_response.result.unwrap()["task"]["taskId"]
2159            .as_str()
2160            .unwrap()
2161            .to_string();
2162
2163        // Cancel first time
2164        let cancel_request = JsonRpcRequest {
2165            jsonrpc: "2.0".to_string(),
2166            id: Some(Value::Number(3.into())),
2167            method: "tasks/cancel".to_string(),
2168            params: Some(serde_json::json!({"taskId": task_id.clone()})),
2169        };
2170
2171        let _ = server
2172            .handle_request("test-session", cancel_request.clone())
2173            .await;
2174
2175        // Wait a bit to ensure cancellation completes
2176        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2177
2178        // Try to cancel again (should fail - already terminal)
2179        let cancel_request2 = JsonRpcRequest {
2180            jsonrpc: "2.0".to_string(),
2181            id: Some(Value::Number(4.into())),
2182            method: "tasks/cancel".to_string(),
2183            params: Some(serde_json::json!({"taskId": task_id})),
2184        };
2185
2186        let cancel_response2 = server.handle_request("test-session", cancel_request2).await;
2187
2188        // Second cancel should fail (already in terminal state)
2189        assert!(cancel_response2.error.is_some());
2190    }
2191
2192    // ========================================================================
2193    // Elicitation Tests
2194    // ========================================================================
2195
2196    #[tokio::test]
2197    async fn test_request_elicitation_no_capability() {
2198        let server = Server::new("test-server", "1.0.0");
2199
2200        // Initialize session WITHOUT elicitation capability
2201        let request = JsonRpcRequest {
2202            jsonrpc: "2.0".to_string(),
2203            id: Some(Value::Number(1.into())),
2204            method: "initialize".to_string(),
2205            params: Some(serde_json::json!({
2206                "protocolVersion": "2025-11-25",
2207                "capabilities": {
2208                    // No elicitation capability
2209                },
2210                "clientInfo": {
2211                    "name": "test-client",
2212                    "version": "1.0.0"
2213                }
2214            })),
2215        };
2216        server.handle_request("test-session", request).await;
2217
2218        // Try to request elicitation
2219        let result = server
2220            .request_elicitation(
2221                "test-session",
2222                "Please provide your email",
2223                serde_json::json!({"type": "object", "properties": {"email": {"type": "string"}}}),
2224                None,
2225            )
2226            .await;
2227
2228        // Should fail with UnsupportedCapability
2229        assert!(matches!(
2230            result,
2231            Err(MultiplexerError::UnsupportedCapability(cap)) if cap == "elicitation"
2232        ));
2233    }
2234
2235    #[tokio::test]
2236    async fn test_request_elicitation_session_not_found() {
2237        let server = Server::new("test-server", "1.0.0");
2238
2239        // Try to request elicitation for non-existent session
2240        let result = server
2241            .request_elicitation(
2242                "nonexistent-session",
2243                "Please provide your email",
2244                serde_json::json!({"type": "object", "properties": {"email": {"type": "string"}}}),
2245                None,
2246            )
2247            .await;
2248
2249        // Should fail with Transport error
2250        assert!(matches!(result, Err(MultiplexerError::Transport(_))));
2251    }
2252
2253    #[tokio::test]
2254    async fn test_client_requester_with_elicitation_capability() {
2255        let server = Server::new("test-server", "1.0.0");
2256
2257        // Initialize session WITH elicitation capability
2258        let request = JsonRpcRequest {
2259            jsonrpc: "2.0".to_string(),
2260            id: Some(Value::Number(1.into())),
2261            method: "initialize".to_string(),
2262            params: Some(serde_json::json!({
2263                "protocolVersion": "2025-11-25",
2264                "capabilities": {
2265                    "elicitation": {}
2266                },
2267                "clientInfo": {
2268                    "name": "test-client",
2269                    "version": "1.0.0"
2270                }
2271            })),
2272        };
2273        server.handle_request("test-session", request).await;
2274
2275        // Create client requester and verify elicitation is supported
2276        let requester = server.create_client_requester("test-session");
2277        assert!(requester.is_some());
2278        let requester = requester.unwrap();
2279        assert!(requester.supports_elicitation());
2280    }
2281
2282    // ========================================================================
2283    // Resource Subscription Tests
2284    // ========================================================================
2285
2286    /// Helper: Initialize a test session with resource subscription capability
2287    async fn init_session_with_subscriptions(server: &Server, session_id: &str) {
2288        // First set server capabilities to support subscriptions
2289        server.set_capabilities(ServerCapabilities {
2290            resources: Some(crate::protocol::capabilities::ResourcesCapability {
2291                subscribe: Some(true),
2292                list_changed: Some(true),
2293                list_templates: Some(true),
2294            }),
2295            ..Default::default()
2296        });
2297
2298        let request = JsonRpcRequest {
2299            jsonrpc: "2.0".to_string(),
2300            id: Some(Value::Number(1.into())),
2301            method: "initialize".to_string(),
2302            params: Some(serde_json::json!({
2303                "protocolVersion": "2025-11-25",
2304                "capabilities": {},
2305                "clientInfo": {
2306                    "name": "test-client",
2307                    "version": "1.0.0"
2308                }
2309            })),
2310        };
2311        server.handle_request(session_id, request).await;
2312    }
2313
2314    #[tokio::test]
2315    async fn test_resources_subscribe() {
2316        let server = Server::new("test-server", "1.0.0");
2317        init_session_with_subscriptions(&server, "test-session").await;
2318
2319        let request = JsonRpcRequest {
2320            jsonrpc: "2.0".to_string(),
2321            id: Some(Value::Number(2.into())),
2322            method: "resources/subscribe".to_string(),
2323            params: Some(serde_json::json!({"uri": "file:///test.txt"})),
2324        };
2325
2326        let response = server.handle_request("test-session", request).await;
2327
2328        assert!(response.result.is_some());
2329        assert!(response.error.is_none());
2330
2331        // Verify subscription was recorded
2332        assert!(server
2333            .subscription_manager()
2334            .is_subscribed("test-session", "file:///test.txt"));
2335    }
2336
2337    #[tokio::test]
2338    async fn test_resources_unsubscribe() {
2339        let server = Server::new("test-server", "1.0.0");
2340        init_session_with_subscriptions(&server, "test-session").await;
2341
2342        // First subscribe
2343        let subscribe_request = JsonRpcRequest {
2344            jsonrpc: "2.0".to_string(),
2345            id: Some(Value::Number(2.into())),
2346            method: "resources/subscribe".to_string(),
2347            params: Some(serde_json::json!({"uri": "file:///test.txt"})),
2348        };
2349        server
2350            .handle_request("test-session", subscribe_request)
2351            .await;
2352
2353        // Now unsubscribe
2354        let unsubscribe_request = JsonRpcRequest {
2355            jsonrpc: "2.0".to_string(),
2356            id: Some(Value::Number(3.into())),
2357            method: "resources/unsubscribe".to_string(),
2358            params: Some(serde_json::json!({"uri": "file:///test.txt"})),
2359        };
2360        let response = server
2361            .handle_request("test-session", unsubscribe_request)
2362            .await;
2363
2364        assert!(response.result.is_some());
2365        assert!(response.error.is_none());
2366
2367        // Verify subscription was removed
2368        assert!(!server
2369            .subscription_manager()
2370            .is_subscribed("test-session", "file:///test.txt"));
2371    }
2372
2373    #[tokio::test]
2374    async fn test_resources_subscribe_without_capability() {
2375        let server = Server::new("test-server", "1.0.0");
2376
2377        // Initialize WITHOUT subscription capability
2378        let request = JsonRpcRequest {
2379            jsonrpc: "2.0".to_string(),
2380            id: Some(Value::Number(1.into())),
2381            method: "initialize".to_string(),
2382            params: Some(serde_json::json!({
2383                "protocolVersion": "2025-11-25",
2384                "capabilities": {},
2385                "clientInfo": {
2386                    "name": "test-client",
2387                    "version": "1.0.0"
2388                }
2389            })),
2390        };
2391        server.handle_request("test-session", request).await;
2392
2393        // Try to subscribe (should fail)
2394        let subscribe_request = JsonRpcRequest {
2395            jsonrpc: "2.0".to_string(),
2396            id: Some(Value::Number(2.into())),
2397            method: "resources/subscribe".to_string(),
2398            params: Some(serde_json::json!({"uri": "file:///test.txt"})),
2399        };
2400        let response = server
2401            .handle_request("test-session", subscribe_request)
2402            .await;
2403
2404        assert!(response.error.is_some());
2405    }
2406
2407    #[tokio::test]
2408    async fn test_session_removal_cleans_subscriptions() {
2409        let server = Server::new("test-server", "1.0.0");
2410        init_session_with_subscriptions(&server, "test-session").await;
2411
2412        // Subscribe to multiple resources
2413        for uri in &["file:///a.txt", "file:///b.txt", "file:///c.txt"] {
2414            let request = JsonRpcRequest {
2415                jsonrpc: "2.0".to_string(),
2416                id: Some(Value::Number(2.into())),
2417                method: "resources/subscribe".to_string(),
2418                params: Some(serde_json::json!({"uri": uri})),
2419            };
2420            server.handle_request("test-session", request).await;
2421        }
2422
2423        // Verify subscriptions exist
2424        assert_eq!(
2425            server
2426                .subscription_manager()
2427                .get_session_subscriptions("test-session")
2428                .len(),
2429            3
2430        );
2431
2432        // Remove session
2433        server.remove_session("test-session");
2434
2435        // Verify subscriptions were cleaned up
2436        assert_eq!(
2437            server
2438                .subscription_manager()
2439                .get_session_subscriptions("test-session")
2440                .len(),
2441            0
2442        );
2443    }
2444
2445    #[tokio::test]
2446    async fn test_notify_resource_updated() {
2447        let server = Server::new("test-server", "1.0.0");
2448        init_session_with_subscriptions(&server, "test-session").await;
2449
2450        // Subscribe
2451        let request = JsonRpcRequest {
2452            jsonrpc: "2.0".to_string(),
2453            id: Some(Value::Number(2.into())),
2454            method: "resources/subscribe".to_string(),
2455            params: Some(serde_json::json!({"uri": "file:///test.txt"})),
2456        };
2457        server.handle_request("test-session", request).await;
2458
2459        // Trigger update notification (this won't actually send anywhere in test,
2460        // but verifies the method exists and doesn't panic)
2461        server.notify_resource_updated("file:///test.txt");
2462
2463        // The notification would be sent to the notification channel
2464        // In a real scenario, the transport would receive it
2465    }
2466
2467    #[tokio::test]
2468    async fn test_subscription_requires_initialization() {
2469        let server = Server::new("test-server", "1.0.0");
2470
2471        // Try to subscribe without initializing
2472        let request = JsonRpcRequest {
2473            jsonrpc: "2.0".to_string(),
2474            id: Some(Value::Number(1.into())),
2475            method: "resources/subscribe".to_string(),
2476            params: Some(serde_json::json!({"uri": "file:///test.txt"})),
2477        };
2478
2479        let response = server.handle_request("test-session", request).await;
2480
2481        // Should fail - not initialized
2482        assert!(response.error.is_some());
2483    }
2484
2485    // ========================================================================
2486    // Completion Tests
2487    // ========================================================================
2488
2489    /// Helper: Initialize a test session with completion capability
2490    async fn init_session_with_completion(server: &Server, session_id: &str) {
2491        // Set server capabilities to support completion
2492        server.set_capabilities(ServerCapabilities {
2493            completion: Some(crate::protocol::capabilities::CompletionCapability {}),
2494            ..Default::default()
2495        });
2496
2497        let request = JsonRpcRequest {
2498            jsonrpc: "2.0".to_string(),
2499            id: Some(Value::Number(1.into())),
2500            method: "initialize".to_string(),
2501            params: Some(serde_json::json!({
2502                "protocolVersion": "2025-11-25",
2503                "capabilities": {},
2504                "clientInfo": {
2505                    "name": "test-client",
2506                    "version": "1.0.0"
2507                }
2508            })),
2509        };
2510        server.handle_request(session_id, request).await;
2511    }
2512
2513    #[tokio::test]
2514    async fn test_completion_complete_prompt() {
2515        use crate::managers::completion::CompletionProvider;
2516        use crate::protocol::types::CompletionValue;
2517        use std::sync::Arc;
2518
2519        let server = Server::new("test-server", "1.0.0");
2520
2521        // Create a test completion provider
2522        struct TestProvider;
2523
2524        #[async_trait::async_trait]
2525        impl CompletionProvider for TestProvider {
2526            async fn complete_prompt(
2527                &self,
2528                prompt_name: &str,
2529                argument_name: &str,
2530                argument_value: &str,
2531            ) -> Vec<CompletionValue> {
2532                if prompt_name == "greet" && argument_name == "name" {
2533                    vec!["Alice", "Amy", "Bob"]
2534                        .into_iter()
2535                        .filter(|v| v.to_lowercase().starts_with(&argument_value.to_lowercase()))
2536                        .map(CompletionValue::new)
2537                        .collect()
2538                } else {
2539                    vec![]
2540                }
2541            }
2542        }
2543
2544        // Register provider
2545        server
2546            .completion_manager()
2547            .register_prompt_provider("greet", Arc::new(TestProvider));
2548
2549        init_session_with_completion(&server, "test-session").await;
2550
2551        // Request completions
2552        let request = JsonRpcRequest {
2553            jsonrpc: "2.0".to_string(),
2554            id: Some(Value::Number(2.into())),
2555            method: "completion/complete".to_string(),
2556            params: Some(serde_json::json!({
2557                "ref": {
2558                    "type": "ref/prompt",
2559                    "name": "greet"
2560                },
2561                "argument": {
2562                    "name": "name",
2563                    "value": "A"
2564                }
2565            })),
2566        };
2567
2568        let response = server.handle_request("test-session", request).await;
2569
2570        assert!(response.result.is_some());
2571        assert!(response.error.is_none());
2572
2573        let result = response.result.unwrap();
2574        let values = result["completion"]["values"].as_array().unwrap();
2575        assert_eq!(values.len(), 2); // Alice, Amy
2576    }
2577
2578    #[tokio::test]
2579    async fn test_completion_complete_no_capability() {
2580        let server = Server::new("test-server", "1.0.0");
2581
2582        // Initialize WITHOUT completion capability (default)
2583        let init_request = JsonRpcRequest {
2584            jsonrpc: "2.0".to_string(),
2585            id: Some(Value::Number(1.into())),
2586            method: "initialize".to_string(),
2587            params: Some(serde_json::json!({
2588                "protocolVersion": "2025-11-25",
2589                "capabilities": {},
2590                "clientInfo": {
2591                    "name": "test-client",
2592                    "version": "1.0.0"
2593                }
2594            })),
2595        };
2596        server.handle_request("test-session", init_request).await;
2597
2598        // Request completions should fail
2599        let request = JsonRpcRequest {
2600            jsonrpc: "2.0".to_string(),
2601            id: Some(Value::Number(2.into())),
2602            method: "completion/complete".to_string(),
2603            params: Some(serde_json::json!({
2604                "ref": {
2605                    "type": "ref/prompt",
2606                    "name": "greet"
2607                },
2608                "argument": {
2609                    "name": "name",
2610                    "value": "A"
2611                }
2612            })),
2613        };
2614
2615        let response = server.handle_request("test-session", request).await;
2616
2617        // Should fail - completion capability not enabled
2618        assert!(response.error.is_some());
2619    }
2620
2621    #[tokio::test]
2622    async fn test_completion_complete_not_initialized() {
2623        let server = Server::new("test-server", "1.0.0");
2624
2625        // Request completions without initialization
2626        let request = JsonRpcRequest {
2627            jsonrpc: "2.0".to_string(),
2628            id: Some(Value::Number(1.into())),
2629            method: "completion/complete".to_string(),
2630            params: Some(serde_json::json!({
2631                "ref": {
2632                    "type": "ref/prompt",
2633                    "name": "greet"
2634                },
2635                "argument": {
2636                    "name": "name",
2637                    "value": "A"
2638                }
2639            })),
2640        };
2641
2642        let response = server.handle_request("test-session", request).await;
2643
2644        // Should fail - not initialized
2645        assert!(response.error.is_some());
2646    }
2647
2648    #[tokio::test]
2649    async fn test_completion_complete_invalid_params() {
2650        let server = Server::new("test-server", "1.0.0");
2651        init_session_with_completion(&server, "test-session").await;
2652
2653        // Request completions with invalid params
2654        let request = JsonRpcRequest {
2655            jsonrpc: "2.0".to_string(),
2656            id: Some(Value::Number(2.into())),
2657            method: "completion/complete".to_string(),
2658            params: Some(serde_json::json!({
2659                "invalid": "params"
2660            })),
2661        };
2662
2663        let response = server.handle_request("test-session", request).await;
2664
2665        // Should fail - invalid params
2666        assert!(response.error.is_some());
2667        let error = response.error.unwrap();
2668        assert_eq!(error.code, -32602); // Invalid params
2669    }
2670
2671    #[tokio::test]
2672    async fn test_completion_complete_resource() {
2673        use crate::managers::completion::CompletionProvider;
2674        use crate::protocol::types::CompletionValue;
2675        use std::sync::Arc;
2676
2677        let server = Server::new("test-server", "1.0.0");
2678
2679        // Create a test completion provider for resources
2680        struct FileProvider;
2681
2682        #[async_trait::async_trait]
2683        impl CompletionProvider for FileProvider {
2684            async fn complete_resource(
2685                &self,
2686                _uri: &str,
2687                _argument_name: &str,
2688                argument_value: &str,
2689            ) -> Vec<CompletionValue> {
2690                vec!["src/main.rs", "src/lib.rs", "tests/test.rs"]
2691                    .into_iter()
2692                    .filter(|v| v.starts_with(argument_value))
2693                    .map(CompletionValue::new)
2694                    .collect()
2695            }
2696        }
2697
2698        // Register provider
2699        server
2700            .completion_manager()
2701            .register_resource_provider("file:///", Arc::new(FileProvider));
2702
2703        init_session_with_completion(&server, "test-session").await;
2704
2705        // Request completions
2706        let request = JsonRpcRequest {
2707            jsonrpc: "2.0".to_string(),
2708            id: Some(Value::Number(2.into())),
2709            method: "completion/complete".to_string(),
2710            params: Some(serde_json::json!({
2711                "ref": {
2712                    "type": "ref/resource",
2713                    "uri": "file:///project"
2714                },
2715                "argument": {
2716                    "name": "path",
2717                    "value": "src/"
2718                }
2719            })),
2720        };
2721
2722        let response = server.handle_request("test-session", request).await;
2723
2724        assert!(response.result.is_some());
2725        assert!(response.error.is_none());
2726
2727        let result = response.result.unwrap();
2728        let values = result["completion"]["values"].as_array().unwrap();
2729        assert_eq!(values.len(), 2); // src/main.rs, src/lib.rs
2730    }
2731}