smcp_computer/mcp_clients/
http_client.rs1use super::base_client::BaseMCPClient;
11use super::model::*;
12use super::{ResourceCache, SubscriptionManager};
13use crate::desktop::window_uri::{is_window_uri, WindowURI};
14use async_trait::async_trait;
15use reqwest::Client;
16use serde_json;
17use std::time::Duration;
18use tracing::{debug, info, warn};
19
20pub struct HttpMCPClient {
22 base: BaseMCPClient<HttpServerParameters>,
24 http_client: Client,
26 session_id: std::sync::Arc<tokio::sync::Mutex<Option<String>>>,
28 subscription_manager: SubscriptionManager,
30 resource_cache: ResourceCache,
32}
33
34impl std::fmt::Debug for HttpMCPClient {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 f.debug_struct("HttpMCPClient")
37 .field("url", &self.base.params.url)
38 .field("headers", &self.base.params.headers)
39 .field("state", &self.base.state())
40 .finish()
41 }
42}
43
44impl HttpMCPClient {
45 pub fn new(params: HttpServerParameters) -> Self {
47 let http_client = Client::builder()
48 .timeout(std::time::Duration::from_secs(30))
49 .build()
50 .expect("Failed to create HTTP client");
51
52 Self {
53 base: BaseMCPClient::new(params),
54 http_client,
55 session_id: std::sync::Arc::new(tokio::sync::Mutex::new(None)),
56 subscription_manager: SubscriptionManager::new(),
57 resource_cache: ResourceCache::new(Duration::from_secs(60)), }
59 }
60
61 async fn send_request(
63 &self,
64 method: &str,
65 params: Option<serde_json::Value>,
66 ) -> Result<serde_json::Value, MCPClientError> {
67 let url = &self.base.params.url;
68
69 let mut request_body = serde_json::json!({
70 "jsonrpc": "2.0",
71 "method": method,
72 });
73
74 if let Some(p) = params {
75 request_body["params"] = p;
76 }
77
78 let is_notification = method.starts_with("notifications/");
80
81 if !is_notification {
83 request_body["id"] = serde_json::Value::Number(serde_json::Number::from(
84 std::time::SystemTime::now()
85 .duration_since(std::time::UNIX_EPOCH)
86 .unwrap()
87 .as_secs() as i64,
88 ));
89 }
90
91 debug!("Sending HTTP request to {}: {}", url, request_body);
92
93 let mut request = self.http_client.post(url);
94
95 for (key, value) in &self.base.params.headers {
97 request = request.header(key, value);
98 }
99
100 request = request.header("Content-Type", "application/json");
102 request = request.header("Accept", "application/json, text/event-stream");
103
104 if let Some(ref sid) = *self.session_id.lock().await {
106 request = request.header("Mcp-Session-Id", sid.as_str());
107 }
108
109 let response =
110 request.json(&request_body).send().await.map_err(|e| {
111 MCPClientError::ConnectionError(format!("HTTP request failed: {}", e))
112 })?;
113
114 let status = response.status();
115
116 if status == reqwest::StatusCode::ACCEPTED || status == reqwest::StatusCode::NO_CONTENT {
118 return Ok(serde_json::json!({}));
119 }
120
121 if !status.is_success() {
122 return Err(MCPClientError::ConnectionError(format!(
123 "HTTP error: {}",
124 status
125 )));
126 }
127
128 if let Some(sid) = response
130 .headers()
131 .get("mcp-session-id")
132 .and_then(|v| v.to_str().ok())
133 {
134 *self.session_id.lock().await = Some(sid.to_string());
135 }
136
137 let content_type = response
139 .headers()
140 .get("content-type")
141 .and_then(|v| v.to_str().ok())
142 .unwrap_or("")
143 .to_string();
144
145 let response_body: serde_json::Value = if content_type.contains("text/event-stream") {
146 let text = response.text().await.map_err(|e| {
148 MCPClientError::ProtocolError(format!("Failed to read SSE response: {}", e))
149 })?;
150 parse_sse_response(&text)?
151 } else {
152 response.json().await.map_err(|e| {
153 MCPClientError::ProtocolError(format!("Failed to parse response: {}", e))
154 })?
155 };
156
157 debug!("Received HTTP response: {}", response_body);
158
159 Ok(response_body)
160 }
161
162 async fn initialize_session(&self) -> Result<(), MCPClientError> {
164 let params = serde_json::json!({
165 "protocolVersion": "2024-11-05",
166 "capabilities": {
167 "tools": {},
168 "resources": {}
169 },
170 "clientInfo": {
171 "name": "a2c-smcp-rust",
172 "version": "0.1.0"
173 }
174 });
175
176 let response = self.send_request("initialize", Some(params)).await?;
177
178 if let Some(error) = response.get("error") {
180 return Err(MCPClientError::ProtocolError(format!(
181 "Initialize error: {}",
182 error
183 )));
184 }
185
186 if let Some(result) = response.get("result") {
187 if self.session_id.lock().await.is_none() {
189 if let Some(session_id) = result.get("sessionId").and_then(|v| v.as_str()) {
190 *self.session_id.lock().await = Some(session_id.to_string());
191 }
192 }
193 }
194
195 self.send_request("notifications/initialized", Some(serde_json::json!({})))
197 .await?;
198
199 info!("HTTP session initialized successfully");
200 Ok(())
201 }
202
203 pub async fn is_subscribed(&self, uri: &str) -> bool {
207 self.subscription_manager.is_subscribed(uri).await
208 }
209
210 pub async fn get_subscriptions(&self) -> Vec<String> {
212 self.subscription_manager.get_subscriptions().await
213 }
214
215 pub async fn subscription_count(&self) -> usize {
217 self.subscription_manager.subscription_count().await
218 }
219
220 pub async fn get_cached_resource(&self, uri: &str) -> Option<serde_json::Value> {
224 self.resource_cache.get(uri).await
225 }
226
227 pub async fn has_cache(&self, uri: &str) -> bool {
229 self.resource_cache.contains(uri).await
230 }
231
232 pub async fn cache_size(&self) -> usize {
234 self.resource_cache.size().await
235 }
236
237 pub async fn cleanup_cache(&self) -> usize {
239 self.resource_cache.cleanup_expired().await
240 }
241
242 pub async fn cache_keys(&self) -> Vec<String> {
244 self.resource_cache.keys().await
245 }
246
247 pub async fn clear_cache(&self) {
249 self.resource_cache.clear().await
250 }
251}
252
253#[async_trait]
254impl MCPClientProtocol for HttpMCPClient {
255 fn state(&self) -> ClientState {
256 self.base.state()
257 }
258
259 async fn connect(&self) -> Result<(), MCPClientError> {
260 if !self.base.can_connect().await {
262 return Err(MCPClientError::ConnectionError(format!(
263 "Cannot connect in state: {}",
264 self.base.get_state().await
265 )));
266 }
267
268 self.initialize_session().await?;
270
271 self.base.update_state(ClientState::Connected).await;
273 info!("HTTP client connected successfully");
274
275 Ok(())
276 }
277
278 async fn disconnect(&self) -> Result<(), MCPClientError> {
279 if !self.base.can_disconnect().await {
281 return Err(MCPClientError::ConnectionError(format!(
282 "Cannot disconnect in state: {}",
283 self.base.get_state().await
284 )));
285 }
286
287 if let Err(e) = self.send_request("shutdown", None).await {
289 warn!("Failed to send shutdown request: {}", e);
290 }
291
292 if let Err(e) = self.send_request("exit", None).await {
294 warn!("Failed to send exit notification: {}", e);
295 }
296
297 *self.session_id.lock().await = None;
299
300 self.base.update_state(ClientState::Disconnected).await;
302 info!("HTTP client disconnected successfully");
303
304 Ok(())
305 }
306
307 async fn list_tools(&self) -> Result<Vec<Tool>, MCPClientError> {
308 if self.base.get_state().await != ClientState::Connected {
309 return Err(MCPClientError::ConnectionError("Not connected".to_string()));
310 }
311
312 let response = self
313 .send_request("tools/list", Some(serde_json::json!({})))
314 .await?;
315
316 if let Some(error) = response.get("error") {
317 return Err(MCPClientError::ProtocolError(format!(
318 "List tools error: {}",
319 error
320 )));
321 }
322
323 if let Some(result) = response.get("result") {
324 if let Some(tools) = result.get("tools").and_then(|v| v.as_array()) {
325 let mut tool_list = Vec::new();
326 for tool in tools {
327 if let Ok(parsed_tool) = serde_json::from_value::<Tool>(tool.clone()) {
328 tool_list.push(parsed_tool);
329 }
330 }
331 return Ok(tool_list);
332 }
333 }
334
335 Ok(vec![])
336 }
337
338 async fn call_tool(
339 &self,
340 tool_name: &str,
341 params: serde_json::Value,
342 ) -> Result<CallToolResult, MCPClientError> {
343 if self.base.get_state().await != ClientState::Connected {
344 return Err(MCPClientError::ConnectionError("Not connected".to_string()));
345 }
346
347 let call_params = serde_json::json!({
348 "name": tool_name,
349 "arguments": params
350 });
351
352 let response = self.send_request("tools/call", Some(call_params)).await?;
353
354 if let Some(error) = response.get("error") {
355 return Err(MCPClientError::ProtocolError(format!(
356 "Call tool error: {}",
357 error
358 )));
359 }
360
361 if let Some(result) = response.get("result") {
362 let call_result: CallToolResult = serde_json::from_value(result.clone())?;
363 return Ok(call_result);
364 }
365
366 Err(MCPClientError::ProtocolError(
367 "Invalid response".to_string(),
368 ))
369 }
370
371 async fn list_windows(&self) -> Result<Vec<Resource>, MCPClientError> {
372 if self.base.get_state().await != ClientState::Connected {
373 return Err(MCPClientError::ConnectionError("Not connected".to_string()));
374 }
375
376 let mut all_resources = Vec::new();
378 let mut cursor: Option<String> = None;
379
380 loop {
381 let params = Some(match cursor.as_ref() {
382 Some(c) => serde_json::json!({ "cursor": c }),
383 None => serde_json::json!({}),
384 });
385
386 let response = self.send_request("resources/list", params).await?;
387
388 if let Some(error) = response.get("error") {
389 return Err(MCPClientError::ProtocolError(format!(
390 "List resources error: {}",
391 error
392 )));
393 }
394
395 if let Some(result) = response.get("result") {
396 if let Some(resources) = result.get("resources").and_then(|v| v.as_array()) {
398 for resource in resources {
399 if let Ok(parsed_resource) =
400 serde_json::from_value::<Resource>(resource.clone())
401 {
402 all_resources.push(parsed_resource);
403 }
404 }
405 }
406
407 cursor = result
409 .get("nextCursor")
410 .and_then(|v| v.as_str())
411 .map(|s| s.to_string());
412
413 if cursor.is_none() {
414 break;
415 }
416 } else {
417 break;
418 }
419 }
420
421 let mut filtered_resources: Vec<(Resource, i32)> = Vec::new();
423
424 for resource in all_resources {
425 if !is_window_uri(&resource.uri) {
426 continue;
427 }
428
429 let priority = if let Ok(uri) = WindowURI::new(&resource.uri) {
431 uri.priority().unwrap_or(0)
432 } else {
433 0
434 };
435
436 filtered_resources.push((resource, priority));
437 }
438
439 filtered_resources.sort_by(|a, b| b.1.cmp(&a.1));
441
442 Ok(filtered_resources.into_iter().map(|(r, _)| r).collect())
444 }
445
446 async fn get_window_detail(
447 &self,
448 resource: Resource,
449 ) -> Result<ReadResourceResult, MCPClientError> {
450 if self.base.get_state().await != ClientState::Connected {
451 return Err(MCPClientError::ConnectionError("Not connected".to_string()));
452 }
453
454 let params = serde_json::json!({
455 "uri": resource.uri
456 });
457
458 let response = self.send_request("resources/read", Some(params)).await?;
459
460 if let Some(error) = response.get("error") {
461 return Err(MCPClientError::ProtocolError(format!(
462 "Read resource error: {}",
463 error
464 )));
465 }
466
467 if let Some(result) = response.get("result") {
468 let read_result: ReadResourceResult = serde_json::from_value(result.clone())?;
469 return Ok(read_result);
470 }
471
472 Err(MCPClientError::ProtocolError(
473 "Invalid response".to_string(),
474 ))
475 }
476
477 async fn subscribe_window(&self, resource: Resource) -> Result<(), MCPClientError> {
478 if self.base.get_state().await != ClientState::Connected {
479 return Err(MCPClientError::ConnectionError("Not connected".to_string()));
480 }
481
482 let params = serde_json::json!({
483 "uri": resource.uri
484 });
485
486 let response = self
487 .send_request("resources/subscribe", Some(params))
488 .await?;
489
490 if let Some(error) = response.get("error") {
491 return Err(MCPClientError::ProtocolError(format!(
492 "Subscribe resource error: {}",
493 error
494 )));
495 }
496
497 let _ = self
499 .subscription_manager
500 .add_subscription(resource.uri.clone())
501 .await;
502
503 match self.get_window_detail(resource.clone()).await {
505 Ok(result) => {
506 if !result.contents.is_empty() {
507 if let Ok(json_value) = serde_json::to_value(&result.contents[0]) {
508 self.resource_cache
509 .set(resource.uri.clone(), json_value, None)
510 .await;
511 info!("Subscribed and cached: {}", resource.uri);
512 }
513 }
514 }
515 Err(e) => {
516 warn!("Failed to fetch resource data after subscription: {:?}", e);
517 }
518 }
519
520 Ok(())
521 }
522
523 async fn unsubscribe_window(&self, resource: Resource) -> Result<(), MCPClientError> {
524 if self.base.get_state().await != ClientState::Connected {
525 return Err(MCPClientError::ConnectionError("Not connected".to_string()));
526 }
527
528 let params = serde_json::json!({
529 "uri": resource.uri
530 });
531
532 let response = self
533 .send_request("resources/unsubscribe", Some(params))
534 .await?;
535
536 if let Some(error) = response.get("error") {
537 return Err(MCPClientError::ProtocolError(format!(
538 "Unsubscribe resource error: {}",
539 error
540 )));
541 }
542
543 let _ = self
545 .subscription_manager
546 .remove_subscription(&resource.uri)
547 .await;
548
549 self.resource_cache.remove(&resource.uri).await;
551 info!("Unsubscribed and removed cache: {}", resource.uri);
552
553 Ok(())
554 }
555}
556
557fn parse_sse_response(text: &str) -> Result<serde_json::Value, MCPClientError> {
560 let mut last_json = None;
561 for line in text.lines() {
562 if let Some(data) = line.strip_prefix("data:") {
563 let data = data.trim();
564 if !data.is_empty() {
565 if let Ok(value) = serde_json::from_str::<serde_json::Value>(data) {
566 if value.get("result").is_some() || value.get("error").is_some() {
568 return Ok(value);
569 }
570 last_json = Some(value);
571 }
572 }
573 }
574 }
575 last_json.ok_or_else(|| {
576 MCPClientError::ProtocolError(format!(
577 "No JSON-RPC message found in SSE response: {}",
578 text.chars().take(200).collect::<String>()
579 ))
580 })
581}
582
583#[cfg(test)]
584mod tests {
585 use super::*;
586 use serde_json::json;
587 use std::collections::HashMap;
588
589 #[test]
590 fn test_parse_sse_response_basic() {
591 let sse =
592 "event: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"tools\":[]}}\n\n";
593 let result = parse_sse_response(sse).unwrap();
594 assert!(result.get("result").is_some());
595 }
596
597 #[test]
598 fn test_parse_sse_response_multiple_data_lines() {
599 let sse = "data: {\"jsonrpc\":\"2.0\",\"method\":\"ping\"}\n\ndata: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"ok\":true}}\n\n";
600 let result = parse_sse_response(sse).unwrap();
601 assert_eq!(result["result"]["ok"], json!(true));
602 }
603
604 #[test]
605 fn test_parse_sse_response_no_data() {
606 let sse = "event: endpoint\n: comment\n\n";
607 assert!(parse_sse_response(sse).is_err());
608 }
609
610 #[tokio::test]
611 async fn test_http_client_creation() {
612 let params = HttpServerParameters {
613 url: "http://localhost:8080".to_string(),
614 headers: HashMap::new(),
615 };
616
617 let client = HttpMCPClient::new(params);
618 assert_eq!(client.state(), ClientState::Initialized);
619 assert_eq!(client.base.params.url, "http://localhost:8080");
620 }
621
622 #[tokio::test]
623 async fn test_http_client_with_headers() {
624 let mut headers = HashMap::new();
625 headers.insert("Authorization".to_string(), "Bearer token123".to_string());
626 headers.insert("Content-Type".to_string(), "application/json".to_string());
627
628 let params = HttpServerParameters {
629 url: "http://localhost:8080".to_string(),
630 headers,
631 };
632
633 let client = HttpMCPClient::new(params);
634 assert_eq!(
635 client.base.params.headers.get("Authorization"),
636 Some(&"Bearer token123".to_string())
637 );
638 }
639
640 #[tokio::test]
641 async fn test_session_id_management() {
642 let params = HttpServerParameters {
643 url: "http://localhost:8080".to_string(),
644 headers: HashMap::new(),
645 };
646
647 let client = HttpMCPClient::new(params);
648
649 let session_id = client.session_id.lock().await;
651 assert!(session_id.is_none());
652 drop(session_id);
653
654 *client.session_id.lock().await = Some("session123".to_string());
656 let session_id = client.session_id.lock().await;
657 assert_eq!(session_id.as_ref().unwrap(), "session123");
658 }
659
660 #[tokio::test]
661 async fn test_send_request_format() {
662 let params = HttpServerParameters {
663 url: "http://localhost:8080".to_string(),
664 headers: HashMap::new(),
665 };
666
667 let client = HttpMCPClient::new(params);
668
669 let method = "test/method";
675 let params = Some(json!({"param1": "value1"}));
676
677 let result = client.send_request(method, params).await;
680 assert!(result.is_err());
681 assert!(matches!(
682 result.unwrap_err(),
683 MCPClientError::ConnectionError(_)
684 ));
685 }
686
687 #[tokio::test]
688 async fn test_connect_state_checks() {
689 let params = HttpServerParameters {
690 url: "http://localhost:8080".to_string(),
691 headers: HashMap::new(),
692 };
693
694 let client = HttpMCPClient::new(params);
695
696 client.base.update_state(ClientState::Connected).await;
698 let result = client.connect().await;
699 assert!(result.is_err());
700 assert!(matches!(
701 result.unwrap_err(),
702 MCPClientError::ConnectionError(_)
703 ));
704 }
705
706 #[tokio::test]
707 async fn test_disconnect_state_checks() {
708 let params = HttpServerParameters {
709 url: "http://localhost:8080".to_string(),
710 headers: HashMap::new(),
711 };
712
713 let client = HttpMCPClient::new(params);
714
715 let result = client.disconnect().await;
717 assert!(result.is_err());
718 assert!(matches!(
719 result.unwrap_err(),
720 MCPClientError::ConnectionError(_)
721 ));
722 }
723
724 #[tokio::test]
725 async fn test_list_tools_requires_connection() {
726 let params = HttpServerParameters {
727 url: "http://localhost:8080".to_string(),
728 headers: HashMap::new(),
729 };
730
731 let client = HttpMCPClient::new(params);
732
733 let result = client.list_tools().await;
735 assert!(result.is_err());
736 assert!(matches!(
737 result.unwrap_err(),
738 MCPClientError::ConnectionError(_)
739 ));
740 }
741
742 #[tokio::test]
743 async fn test_call_tool_requires_connection() {
744 let params = HttpServerParameters {
745 url: "http://localhost:8080".to_string(),
746 headers: HashMap::new(),
747 };
748
749 let client = HttpMCPClient::new(params);
750
751 let result = client.call_tool("test_tool", json!({})).await;
753 assert!(result.is_err());
754 assert!(matches!(
755 result.unwrap_err(),
756 MCPClientError::ConnectionError(_)
757 ));
758 }
759
760 #[tokio::test]
761 async fn test_list_windows_requires_connection() {
762 let params = HttpServerParameters {
763 url: "http://localhost:8080".to_string(),
764 headers: HashMap::new(),
765 };
766
767 let client = HttpMCPClient::new(params);
768
769 let result = client.list_windows().await;
771 assert!(result.is_err());
772 assert!(matches!(
773 result.unwrap_err(),
774 MCPClientError::ConnectionError(_)
775 ));
776 }
777
778 #[tokio::test]
779 async fn test_get_window_detail_requires_connection() {
780 let params = HttpServerParameters {
781 url: "http://localhost:8080".to_string(),
782 headers: HashMap::new(),
783 };
784
785 let client = HttpMCPClient::new(params);
786
787 let resource = Resource {
788 uri: "window://123".to_string(),
789 name: "Test Window".to_string(),
790 description: None,
791 mime_type: None,
792 };
793
794 let result = client.get_window_detail(resource).await;
796 assert!(result.is_err());
797 assert!(matches!(
798 result.unwrap_err(),
799 MCPClientError::ConnectionError(_)
800 ));
801 }
802
803 #[tokio::test]
804 async fn test_initialize_session_request_format() {
805 let params = HttpServerParameters {
806 url: "http://localhost:8080".to_string(),
807 headers: HashMap::new(),
808 };
809
810 let client = HttpMCPClient::new(params);
811
812 let result = client.initialize_session().await;
814 assert!(result.is_err());
815 }
816
817 #[tokio::test]
818 async fn test_disconnect_cleanup() {
819 let params = HttpServerParameters {
820 url: "http://localhost:8080".to_string(),
821 headers: HashMap::new(),
822 };
823
824 let client = HttpMCPClient::new(params);
825
826 *client.session_id.lock().await = Some("session123".to_string());
828
829 client.base.update_state(ClientState::Connected).await;
831
832 let _ = client.disconnect().await;
834
835 let session_id = client.session_id.lock().await;
837 assert!(session_id.is_none());
838
839 assert_eq!(client.base.get_state().await, ClientState::Disconnected);
841 }
842
843 #[tokio::test]
844 async fn test_error_handling_in_list_tools() {
845 let params = HttpServerParameters {
846 url: "http://localhost:8080".to_string(),
847 headers: HashMap::new(),
848 };
849
850 let client = HttpMCPClient::new(params);
851
852 client.base.update_state(ClientState::Connected).await;
854
855 let result = client.list_tools().await;
857 assert!(result.is_err());
858 }
859
860 #[tokio::test]
861 async fn test_error_handling_in_call_tool() {
862 let params = HttpServerParameters {
863 url: "http://localhost:8080".to_string(),
864 headers: HashMap::new(),
865 };
866
867 let client = HttpMCPClient::new(params);
868
869 client.base.update_state(ClientState::Connected).await;
871
872 let result = client
874 .call_tool("test_tool", json!({"param": "value"}))
875 .await;
876 assert!(result.is_err());
877 }
878
879 #[tokio::test]
880 async fn test_http_client_debug_format() {
881 let params = HttpServerParameters {
882 url: "http://localhost:8080".to_string(),
883 headers: HashMap::new(),
884 };
885
886 let client = HttpMCPClient::new(params);
887
888 let debug_str = format!("{:?}", client);
890 assert!(debug_str.contains("HttpMCPClient"));
891 }
892}