Skip to main content

agentic_evolve_mcp/protocol/
handler.rs

1//! Main request dispatcher — receives JSON-RPC messages, routes to handlers.
2
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7use serde_json::Value;
8
9use crate::session::SessionManager;
10use crate::tools::ToolRegistry;
11use crate::types::*;
12
13use super::compact;
14use super::negotiation::NegotiatedCapabilities;
15use super::validator::validate_request;
16
17/// The main protocol handler that dispatches incoming JSON-RPC messages.
18pub struct ProtocolHandler {
19    session: Arc<Mutex<SessionManager>>,
20    capabilities: Arc<Mutex<NegotiatedCapabilities>>,
21    shutdown_requested: Arc<AtomicBool>,
22}
23
24impl ProtocolHandler {
25    /// Create a new protocol handler with the given session manager.
26    pub fn new(session: Arc<Mutex<SessionManager>>) -> Self {
27        Self {
28            session,
29            capabilities: Arc::new(Mutex::new(NegotiatedCapabilities::default())),
30            shutdown_requested: Arc::new(AtomicBool::new(false)),
31        }
32    }
33
34    /// Returns true once a shutdown request has been handled.
35    pub fn shutdown_requested(&self) -> bool {
36        self.shutdown_requested.load(Ordering::Relaxed)
37    }
38
39    /// Handle an incoming JSON-RPC message and optionally return a response.
40    pub async fn handle_message(&self, msg: JsonRpcMessage) -> Option<Value> {
41        match msg {
42            JsonRpcMessage::Request(req) => Some(self.handle_request(req).await),
43            JsonRpcMessage::Notification(notif) => {
44                self.handle_notification(notif).await;
45                None
46            }
47            _ => {
48                // Responses and errors from the client are unexpected
49                tracing::warn!("Received unexpected message type from client");
50                None
51            }
52        }
53    }
54
55    async fn handle_request(&self, request: JsonRpcRequest) -> Value {
56        // Validate JSON-RPC structure
57        if let Err(e) = validate_request(&request) {
58            return serde_json::to_value(e.to_json_rpc_error(request.id)).unwrap_or_default();
59        }
60
61        let id = request.id.clone();
62        let result = self.dispatch_request(&request).await;
63
64        match result {
65            Ok(value) => serde_json::to_value(JsonRpcResponse::new(id, value)).unwrap_or_default(),
66            Err(e) => serde_json::to_value(e.to_json_rpc_error(id)).unwrap_or_default(),
67        }
68    }
69
70    async fn dispatch_request(&self, request: &JsonRpcRequest) -> McpResult<Value> {
71        match request.method.as_str() {
72            // Lifecycle
73            "initialize" => self.handle_initialize(request.params.clone()).await,
74            "shutdown" => self.handle_shutdown().await,
75
76            // Tools
77            "tools/list" => self.handle_tools_list().await,
78            "tools/call" => self.handle_tools_call(request.params.clone()).await,
79
80            // Resources (empty — this server only exposes tools)
81            "resources/list" => {
82                let result = ResourceListResult {
83                    resources: Vec::new(),
84                    next_cursor: None,
85                };
86                serde_json::to_value(result).map_err(|e| McpError::InternalError(e.to_string()))
87            }
88            "resources/templates/list" => {
89                let result = serde_json::json!({
90                    "resourceTemplates": [],
91                });
92                Ok(result)
93            }
94            "resources/subscribe" => Ok(Value::Object(serde_json::Map::new())),
95            "resources/unsubscribe" => Ok(Value::Object(serde_json::Map::new())),
96
97            // Prompts (empty)
98            "prompts/list" => {
99                let result = PromptListResult {
100                    prompts: Vec::new(),
101                    next_cursor: None,
102                };
103                serde_json::to_value(result).map_err(|e| McpError::InternalError(e.to_string()))
104            }
105
106            // Ping
107            "ping" => Ok(Value::Object(serde_json::Map::new())),
108
109            _ => Err(McpError::MethodNotFound(request.method.clone())),
110        }
111    }
112
113    async fn handle_notification(&self, notification: JsonRpcNotification) {
114        match notification.method.as_str() {
115            "initialized" | "notifications/initialized" => {
116                let mut caps = self.capabilities.lock().await;
117                if let Err(e) = caps.mark_initialized() {
118                    tracing::error!("Failed to mark initialized: {e}");
119                }
120            }
121            "notifications/cancelled" | "$/cancelRequest" => {
122                tracing::info!("Received cancellation notification");
123            }
124            _ => {
125                tracing::debug!("Unknown notification: {}", notification.method);
126            }
127        }
128    }
129
130    async fn handle_initialize(&self, params: Option<Value>) -> McpResult<Value> {
131        let init_params: InitializeParams = params
132            .map(serde_json::from_value)
133            .transpose()
134            .map_err(|e| McpError::InvalidParams(e.to_string()))?
135            .ok_or_else(|| McpError::InvalidParams("Initialize params required".to_string()))?;
136
137        let mut caps = self.capabilities.lock().await;
138        let result = caps.negotiate(init_params)?;
139
140        serde_json::to_value(result).map_err(|e| McpError::InternalError(e.to_string()))
141    }
142
143    async fn handle_shutdown(&self) -> McpResult<Value> {
144        tracing::info!("Shutdown requested");
145        self.shutdown_requested.store(true, Ordering::Relaxed);
146        Ok(Value::Object(serde_json::Map::new()))
147    }
148
149    async fn handle_tools_list(&self) -> McpResult<Value> {
150        let tools = if compact::is_compact_mode() {
151            compact::compact_tool_definitions()
152        } else {
153            ToolRegistry::list_tools()
154        };
155        let result = ToolListResult {
156            tools,
157            next_cursor: None,
158        };
159        serde_json::to_value(result).map_err(|e| McpError::InternalError(e.to_string()))
160    }
161
162    async fn handle_tools_call(&self, params: Option<Value>) -> McpResult<Value> {
163        let call_params: ToolCallParams = params
164            .map(serde_json::from_value)
165            .transpose()
166            .map_err(|e| McpError::InvalidParams(e.to_string()))?
167            .ok_or_else(|| McpError::InvalidParams("Tool call params required".to_string()))?;
168
169        // Normalize compact facade calls to underlying tool names.
170        let (tool_name, arguments) = if compact::is_compact_facade(&call_params.name) {
171            match compact::normalize_compact_call(&call_params.name, &call_params.arguments) {
172                Some((real_name, real_args)) => (real_name, real_args),
173                None => {
174                    return Err(McpError::InvalidParams(
175                        "Invalid operation for compact facade".to_string(),
176                    ));
177                }
178            }
179        } else {
180            (call_params.name, call_params.arguments)
181        };
182
183        // Classify errors: protocol errors (ToolNotFound etc.) become JSON-RPC errors;
184        // tool execution errors become isError: true.
185        let result = match ToolRegistry::call(&tool_name, arguments, &self.session).await {
186            Ok(r) => r,
187            Err(e) if e.is_protocol_error() => return Err(e),
188            Err(e) => ToolCallResult::error(e.to_string()),
189        };
190
191        serde_json::to_value(result).map_err(|e| McpError::InternalError(e.to_string()))
192    }
193}