agentic_evolve_mcp/protocol/
handler.rs1use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7use serde_json::Value;
8
9use crate::session::SessionManager;
10use crate::tools::ToolRegistry;
11use crate::types::*;
12
13use super::compact;
14use super::negotiation::NegotiatedCapabilities;
15use super::validator::validate_request;
16
17pub struct ProtocolHandler {
19 session: Arc<Mutex<SessionManager>>,
20 capabilities: Arc<Mutex<NegotiatedCapabilities>>,
21 shutdown_requested: Arc<AtomicBool>,
22}
23
24impl ProtocolHandler {
25 pub fn new(session: Arc<Mutex<SessionManager>>) -> Self {
27 Self {
28 session,
29 capabilities: Arc::new(Mutex::new(NegotiatedCapabilities::default())),
30 shutdown_requested: Arc::new(AtomicBool::new(false)),
31 }
32 }
33
34 pub fn shutdown_requested(&self) -> bool {
36 self.shutdown_requested.load(Ordering::Relaxed)
37 }
38
39 pub async fn handle_message(&self, msg: JsonRpcMessage) -> Option<Value> {
41 match msg {
42 JsonRpcMessage::Request(req) => Some(self.handle_request(req).await),
43 JsonRpcMessage::Notification(notif) => {
44 self.handle_notification(notif).await;
45 None
46 }
47 _ => {
48 tracing::warn!("Received unexpected message type from client");
50 None
51 }
52 }
53 }
54
55 async fn handle_request(&self, request: JsonRpcRequest) -> Value {
56 if let Err(e) = validate_request(&request) {
58 return serde_json::to_value(e.to_json_rpc_error(request.id)).unwrap_or_default();
59 }
60
61 let id = request.id.clone();
62 let result = self.dispatch_request(&request).await;
63
64 match result {
65 Ok(value) => serde_json::to_value(JsonRpcResponse::new(id, value)).unwrap_or_default(),
66 Err(e) => serde_json::to_value(e.to_json_rpc_error(id)).unwrap_or_default(),
67 }
68 }
69
70 async fn dispatch_request(&self, request: &JsonRpcRequest) -> McpResult<Value> {
71 match request.method.as_str() {
72 "initialize" => self.handle_initialize(request.params.clone()).await,
74 "shutdown" => self.handle_shutdown().await,
75
76 "tools/list" => self.handle_tools_list().await,
78 "tools/call" => self.handle_tools_call(request.params.clone()).await,
79
80 "resources/list" => {
82 let result = ResourceListResult {
83 resources: Vec::new(),
84 next_cursor: None,
85 };
86 serde_json::to_value(result).map_err(|e| McpError::InternalError(e.to_string()))
87 }
88 "resources/templates/list" => {
89 let result = serde_json::json!({
90 "resourceTemplates": [],
91 });
92 Ok(result)
93 }
94 "resources/subscribe" => Ok(Value::Object(serde_json::Map::new())),
95 "resources/unsubscribe" => Ok(Value::Object(serde_json::Map::new())),
96
97 "prompts/list" => {
99 let result = PromptListResult {
100 prompts: Vec::new(),
101 next_cursor: None,
102 };
103 serde_json::to_value(result).map_err(|e| McpError::InternalError(e.to_string()))
104 }
105
106 "ping" => Ok(Value::Object(serde_json::Map::new())),
108
109 _ => Err(McpError::MethodNotFound(request.method.clone())),
110 }
111 }
112
113 async fn handle_notification(&self, notification: JsonRpcNotification) {
114 match notification.method.as_str() {
115 "initialized" | "notifications/initialized" => {
116 let mut caps = self.capabilities.lock().await;
117 if let Err(e) = caps.mark_initialized() {
118 tracing::error!("Failed to mark initialized: {e}");
119 }
120 }
121 "notifications/cancelled" | "$/cancelRequest" => {
122 tracing::info!("Received cancellation notification");
123 }
124 _ => {
125 tracing::debug!("Unknown notification: {}", notification.method);
126 }
127 }
128 }
129
130 async fn handle_initialize(&self, params: Option<Value>) -> McpResult<Value> {
131 let init_params: InitializeParams = params
132 .map(serde_json::from_value)
133 .transpose()
134 .map_err(|e| McpError::InvalidParams(e.to_string()))?
135 .ok_or_else(|| McpError::InvalidParams("Initialize params required".to_string()))?;
136
137 let mut caps = self.capabilities.lock().await;
138 let result = caps.negotiate(init_params)?;
139
140 serde_json::to_value(result).map_err(|e| McpError::InternalError(e.to_string()))
141 }
142
143 async fn handle_shutdown(&self) -> McpResult<Value> {
144 tracing::info!("Shutdown requested");
145 self.shutdown_requested.store(true, Ordering::Relaxed);
146 Ok(Value::Object(serde_json::Map::new()))
147 }
148
149 async fn handle_tools_list(&self) -> McpResult<Value> {
150 let tools = if compact::is_compact_mode() {
151 compact::compact_tool_definitions()
152 } else {
153 ToolRegistry::list_tools()
154 };
155 let result = ToolListResult {
156 tools,
157 next_cursor: None,
158 };
159 serde_json::to_value(result).map_err(|e| McpError::InternalError(e.to_string()))
160 }
161
162 async fn handle_tools_call(&self, params: Option<Value>) -> McpResult<Value> {
163 let call_params: ToolCallParams = params
164 .map(serde_json::from_value)
165 .transpose()
166 .map_err(|e| McpError::InvalidParams(e.to_string()))?
167 .ok_or_else(|| McpError::InvalidParams("Tool call params required".to_string()))?;
168
169 let (tool_name, arguments) = if compact::is_compact_facade(&call_params.name) {
171 match compact::normalize_compact_call(&call_params.name, &call_params.arguments) {
172 Some((real_name, real_args)) => (real_name, real_args),
173 None => {
174 return Err(McpError::InvalidParams(
175 "Invalid operation for compact facade".to_string(),
176 ));
177 }
178 }
179 } else {
180 (call_params.name, call_params.arguments)
181 };
182
183 let result = match ToolRegistry::call(&tool_name, arguments, &self.session).await {
186 Ok(r) => r,
187 Err(e) if e.is_protocol_error() => return Err(e),
188 Err(e) => ToolCallResult::error(e.to_string()),
189 };
190
191 serde_json::to_value(result).map_err(|e| McpError::InternalError(e.to_string()))
192 }
193}