1pub mod security;
13
14use crate::dht::{Key, DHT};
15use crate::{PeerId, Result, P2PError};
16use serde::{Deserialize, Serialize};
17use serde_json::{json, Value};
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime, Instant};
21use tokio::sync::{RwLock, mpsc, oneshot};
22use tokio::time::timeout;
23use tracing::{debug, info};
24use rand;
25
26pub use security::*;
27
28pub const MCP_VERSION: &str = "2024-11-05";
30
31pub const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
33
34pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30);
36
37pub const MCP_PROTOCOL: &str = "/p2p-foundation/mcp/1.0.0";
39
40pub const SERVICE_DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum MCPMessage {
47 Initialize {
49 protocol_version: String,
51 capabilities: MCPCapabilities,
53 client_info: MCPClientInfo,
55 },
56 InitializeResult {
58 protocol_version: String,
60 capabilities: MCPCapabilities,
62 server_info: MCPServerInfo,
64 },
65 ListTools {
67 cursor: Option<String>,
69 },
70 ListToolsResult {
72 tools: Vec<MCPTool>,
74 next_cursor: Option<String>,
76 },
77 CallTool {
79 name: String,
81 arguments: Value,
83 },
84 CallToolResult {
86 content: Vec<MCPContent>,
88 is_error: bool,
90 },
91 ListPrompts {
93 cursor: Option<String>,
95 },
96 ListPromptsResult {
98 prompts: Vec<MCPPrompt>,
100 next_cursor: Option<String>,
102 },
103 GetPrompt {
105 name: String,
107 arguments: Option<Value>,
109 },
110 GetPromptResult {
112 description: Option<String>,
114 messages: Vec<MCPPromptMessage>,
116 },
117 ListResources {
119 cursor: Option<String>,
121 },
122 ListResourcesResult {
124 resources: Vec<MCPResource>,
126 next_cursor: Option<String>,
128 },
129 ReadResource {
131 uri: String,
133 },
134 ReadResourceResult {
136 contents: Vec<MCPResourceContent>,
138 },
139 SubscribeResource {
141 uri: String,
143 },
144 UnsubscribeResource {
146 uri: String,
148 },
149 ResourceUpdated {
151 uri: String,
153 },
154 ListLogs {
156 cursor: Option<String>,
158 },
159 ListLogsResult {
161 logs: Vec<MCPLogEntry>,
163 next_cursor: Option<String>,
165 },
166 SetLogLevel {
168 level: MCPLogLevel,
170 },
171 Error {
173 code: i32,
175 message: String,
177 data: Option<Value>,
179 },
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct MCPCapabilities {
185 pub experimental: Option<Value>,
187 pub sampling: Option<Value>,
189 pub tools: Option<MCPToolsCapability>,
191 pub prompts: Option<MCPPromptsCapability>,
193 pub resources: Option<MCPResourcesCapability>,
195 pub logging: Option<MCPLoggingCapability>,
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct MCPToolsCapability {
202 pub list_changed: Option<bool>,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct MCPPromptsCapability {
209 pub list_changed: Option<bool>,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct MCPResourcesCapability {
216 pub subscribe: Option<bool>,
218 pub list_changed: Option<bool>,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct MCPLoggingCapability {
225 pub levels: Option<Vec<MCPLogLevel>>,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct MCPClientInfo {
232 pub name: String,
234 pub version: String,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct MCPServerInfo {
241 pub name: String,
243 pub version: String,
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct MCPTool {
250 pub name: String,
252 pub description: String,
254 pub input_schema: Value,
256}
257
258pub struct Tool {
260 pub definition: MCPTool,
262 pub handler: Box<dyn ToolHandler + Send + Sync>,
264 pub metadata: ToolMetadata,
266}
267
268#[derive(Debug, Clone)]
270pub struct ToolMetadata {
271 pub created_at: SystemTime,
273 pub last_called: Option<SystemTime>,
275 pub call_count: u64,
277 pub avg_execution_time: Duration,
279 pub health_status: ToolHealthStatus,
281 pub tags: Vec<String>,
283}
284
285#[derive(Debug, Clone, Copy, PartialEq)]
287pub enum ToolHealthStatus {
288 Healthy,
290 Degraded,
292 Unhealthy,
294 Disabled,
296}
297
298pub trait ToolHandler {
300 fn execute(&self, arguments: Value) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>>;
302
303 fn validate(&self, arguments: &Value) -> Result<()> {
305 let _ = arguments;
307 Ok(())
308 }
309
310 fn get_requirements(&self) -> ToolRequirements {
312 ToolRequirements::default()
313 }
314}
315
316#[derive(Debug, Clone)]
318pub struct ToolRequirements {
319 pub max_memory: Option<u64>,
321 pub max_execution_time: Option<Duration>,
323 pub required_capabilities: Vec<String>,
325 pub requires_network: bool,
327 pub requires_filesystem: bool,
329}
330
331impl Default for ToolRequirements {
332 fn default() -> Self {
333 Self {
334 max_memory: Some(100 * 1024 * 1024), max_execution_time: Some(Duration::from_secs(30)),
336 required_capabilities: Vec::new(),
337 requires_network: false,
338 requires_filesystem: false,
339 }
340 }
341}
342
343#[derive(Debug, Clone, Serialize, Deserialize)]
345#[serde(tag = "type", rename_all = "snake_case")]
346pub enum MCPContent {
347 Text {
349 text: String,
351 },
352 Image {
354 data: String,
356 mime_type: String,
358 },
359 Resource {
361 resource: MCPResourceReference,
363 },
364}
365
366#[derive(Debug, Clone, Serialize, Deserialize)]
368pub struct MCPResourceReference {
369 pub uri: String,
371 pub type_: Option<String>,
373}
374
375#[derive(Debug, Clone, Serialize, Deserialize)]
377pub struct MCPPrompt {
378 pub name: String,
380 pub description: Option<String>,
382 pub arguments: Option<Value>,
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize)]
388pub struct MCPPromptMessage {
389 pub role: MCPRole,
391 pub content: MCPContent,
393}
394
395#[derive(Debug, Clone, Serialize, Deserialize)]
397#[serde(rename_all = "snake_case")]
398pub enum MCPRole {
399 User,
401 Assistant,
403 System,
405}
406
407#[derive(Debug, Clone, Serialize, Deserialize)]
409pub struct MCPResource {
410 pub uri: String,
412 pub name: String,
414 pub description: Option<String>,
416 pub mime_type: Option<String>,
418}
419
420#[derive(Debug, Clone, Serialize, Deserialize)]
422pub struct MCPResourceContent {
423 pub uri: String,
425 pub mime_type: String,
427 pub text: Option<String>,
429 pub blob: Option<String>,
431}
432
433#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
435#[serde(rename_all = "snake_case")]
436pub enum MCPLogLevel {
437 Debug,
439 Info,
441 Notice,
443 Warning,
445 Error,
447 Critical,
449 Alert,
451 Emergency,
453}
454
455#[derive(Debug, Clone, Serialize, Deserialize)]
457pub struct MCPLogEntry {
458 pub level: MCPLogLevel,
460 pub data: Value,
462 pub logger: Option<String>,
464}
465
466#[derive(Debug, Clone, Serialize, Deserialize)]
468pub struct MCPService {
469 pub service_id: String,
471 pub node_id: PeerId,
473 pub tools: Vec<String>,
475 pub capabilities: MCPCapabilities,
477 pub metadata: MCPServiceMetadata,
479 pub registered_at: SystemTime,
481 pub endpoint: MCPEndpoint,
483}
484
485#[derive(Debug, Clone, Serialize, Deserialize)]
487pub struct MCPServiceMetadata {
488 pub name: String,
490 pub version: String,
492 pub description: Option<String>,
494 pub tags: Vec<String>,
496 pub health_status: ServiceHealthStatus,
498 pub load_metrics: ServiceLoadMetrics,
500}
501
502#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
504pub enum ServiceHealthStatus {
505 Healthy,
507 Degraded,
509 Unhealthy,
511 Maintenance,
513}
514
515#[derive(Debug, Clone, Serialize, Deserialize)]
517pub struct ServiceLoadMetrics {
518 pub active_requests: u32,
520 pub requests_per_second: f64,
522 pub avg_response_time_ms: f64,
524 pub error_rate: f64,
526 pub cpu_usage: f64,
528 pub memory_usage: u64,
530}
531
532#[derive(Debug, Clone, Serialize, Deserialize)]
534pub struct MCPEndpoint {
535 pub protocol: String,
537 pub address: String,
539 pub port: Option<u16>,
541 pub tls: bool,
543 pub auth_required: bool,
545}
546
547#[derive(Debug, Clone)]
549pub struct MCPRequest {
550 pub request_id: String,
552 pub source_peer: PeerId,
554 pub target_peer: PeerId,
556 pub message: MCPMessage,
558 pub timestamp: SystemTime,
560 pub timeout: Duration,
562 pub auth_token: Option<String>,
564}
565
566#[derive(Debug, Clone, Serialize, Deserialize)]
568pub struct P2PMCPMessage {
569 pub message_type: P2PMCPMessageType,
571 pub message_id: String,
573 pub source_peer: PeerId,
575 pub target_peer: Option<PeerId>,
577 pub timestamp: u64,
579 pub payload: MCPMessage,
581 pub ttl: u8,
583}
584
585#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
587pub enum P2PMCPMessageType {
588 Request,
590 Response,
592 ServiceAdvertisement,
594 ServiceDiscovery,
596}
597
598#[derive(Debug, Clone)]
600pub struct MCPResponse {
601 pub request_id: String,
603 pub message: MCPMessage,
605 pub timestamp: SystemTime,
607 pub processing_time: Duration,
609}
610
611#[derive(Debug, Clone)]
613pub struct MCPCallContext {
614 pub caller_id: PeerId,
616 pub timestamp: SystemTime,
618 pub timeout: Duration,
620 pub auth_info: Option<MCPAuthInfo>,
622 pub metadata: HashMap<String, String>,
624}
625
626#[derive(Debug, Clone)]
628pub struct MCPAuthInfo {
629 pub token: String,
631 pub token_type: String,
633 pub expires_at: Option<SystemTime>,
635 pub permissions: Vec<String>,
637}
638
639#[derive(Debug, Clone, Serialize, Deserialize)]
641pub struct MCPServerConfig {
642 pub server_name: String,
644 pub server_version: String,
646 pub enable_dht_discovery: bool,
648 pub max_concurrent_requests: usize,
650 pub request_timeout: Duration,
652 pub enable_auth: bool,
654 pub enable_rate_limiting: bool,
656 pub rate_limit_rpm: u32,
658 pub enable_logging: bool,
660 pub max_tool_execution_time: Duration,
662 pub tool_memory_limit: u64,
664}
665
666impl Default for MCPServerConfig {
667 fn default() -> Self {
668 Self {
669 server_name: "P2P-MCP-Server".to_string(),
670 server_version: crate::VERSION.to_string(),
671 enable_dht_discovery: true,
672 max_concurrent_requests: 100,
673 request_timeout: DEFAULT_CALL_TIMEOUT,
674 enable_auth: true,
675 enable_rate_limiting: true,
676 rate_limit_rpm: 60,
677 enable_logging: true,
678 max_tool_execution_time: Duration::from_secs(30),
679 tool_memory_limit: 100 * 1024 * 1024, }
681 }
682}
683
684pub struct MCPServer {
686 config: MCPServerConfig,
688 tools: Arc<RwLock<HashMap<String, Tool>>>,
690 #[allow(dead_code)]
692 prompts: Arc<RwLock<HashMap<String, MCPPrompt>>>,
693 #[allow(dead_code)]
695 resources: Arc<RwLock<HashMap<String, MCPResource>>>,
696 sessions: Arc<RwLock<HashMap<String, MCPSession>>>,
698 request_handlers: Arc<RwLock<HashMap<String, oneshot::Sender<MCPResponse>>>>,
700 dht: Option<Arc<RwLock<DHT>>>,
702 local_services: Arc<RwLock<HashMap<String, MCPService>>>,
704 remote_services: Arc<RwLock<HashMap<String, MCPService>>>,
706 stats: Arc<RwLock<MCPServerStats>>,
708 request_tx: mpsc::UnboundedSender<MCPRequest>,
710 #[allow(dead_code)]
712 response_rx: Arc<RwLock<mpsc::UnboundedReceiver<MCPResponse>>>,
713 security_manager: Option<Arc<MCPSecurityManager>>,
715 audit_logger: Arc<SecurityAuditLogger>,
717}
718
719#[derive(Debug, Clone)]
721pub struct MCPSession {
722 pub session_id: String,
724 pub peer_id: PeerId,
726 pub client_capabilities: Option<MCPCapabilities>,
728 pub started_at: SystemTime,
730 pub last_activity: SystemTime,
732 pub state: MCPSessionState,
734 pub subscribed_resources: Vec<String>,
736}
737
738#[derive(Debug, Clone, PartialEq)]
740pub enum MCPSessionState {
741 Initializing,
743 Active,
745 Inactive,
747 Terminated,
749}
750
751#[derive(Debug, Clone)]
753pub struct MCPServerStats {
754 pub total_requests: u64,
756 pub total_responses: u64,
758 pub total_errors: u64,
760 pub avg_response_time: Duration,
762 pub active_sessions: u32,
764 pub total_tools: u32,
766 pub popular_tools: HashMap<String, u64>,
768 pub server_started_at: SystemTime,
770}
771
772impl Default for MCPServerStats {
773 fn default() -> Self {
774 Self {
775 total_requests: 0,
776 total_responses: 0,
777 total_errors: 0,
778 avg_response_time: Duration::from_millis(0),
779 active_sessions: 0,
780 total_tools: 0,
781 popular_tools: HashMap::new(),
782 server_started_at: SystemTime::now(),
783 }
784 }
785}
786
787impl MCPServer {
788 pub fn new(config: MCPServerConfig) -> Self {
790 let (request_tx, _request_rx) = mpsc::unbounded_channel();
791 let (_response_tx, response_rx) = mpsc::unbounded_channel();
792
793 let security_manager = if config.enable_auth {
795 let secret_key = (0..32).map(|_| rand::random::<u8>()).collect();
797 Some(Arc::new(MCPSecurityManager::new(secret_key, config.rate_limit_rpm)))
798 } else {
799 None
800 };
801
802 let server = Self {
803 config,
804 tools: Arc::new(RwLock::new(HashMap::new())),
805 prompts: Arc::new(RwLock::new(HashMap::new())),
806 resources: Arc::new(RwLock::new(HashMap::new())),
807 sessions: Arc::new(RwLock::new(HashMap::new())),
808 request_handlers: Arc::new(RwLock::new(HashMap::new())),
809 dht: None,
810 local_services: Arc::new(RwLock::new(HashMap::new())),
811 remote_services: Arc::new(RwLock::new(HashMap::new())),
812 stats: Arc::new(RwLock::new(MCPServerStats::default())),
813 request_tx,
814 response_rx: Arc::new(RwLock::new(response_rx)),
815 security_manager,
816 audit_logger: Arc::new(SecurityAuditLogger::new(10000)), };
818
819 server
820 }
821
822 pub fn with_dht(mut self, dht: Arc<RwLock<DHT>>) -> Self {
824 self.dht = Some(dht);
825 self
826 }
827
828 pub async fn start(&self) -> Result<()> {
830 info!("Starting MCP server: {}", self.config.server_name);
831
832 self.start_request_processor().await?;
834
835 if self.dht.is_some() {
837 self.start_service_discovery().await?;
838 }
839
840 self.start_health_monitor().await?;
842
843 info!("MCP server started successfully");
844 Ok(())
845 }
846
847 pub async fn register_tool(&self, tool: Tool) -> Result<()> {
849 let tool_name = tool.definition.name.clone();
850
851 self.validate_tool(&tool).await?;
853
854 {
856 let mut tools = self.tools.write().await;
857 tools.insert(tool_name.clone(), tool);
858 }
859
860 {
862 let mut stats = self.stats.write().await;
863 stats.total_tools += 1;
864 }
865
866 if let Some(dht) = &self.dht {
868 self.register_tool_in_dht(&tool_name, dht).await?;
869 }
870
871 info!("Registered tool: {}", tool_name);
872 Ok(())
873 }
874
875 async fn validate_tool(&self, tool: &Tool) -> Result<()> {
877 let tools = self.tools.read().await;
879 if tools.contains_key(&tool.definition.name) {
880 return Err(P2PError::MCP(format!("Tool already exists: {}", tool.definition.name)).into());
881 }
882
883 if tool.definition.name.is_empty() || tool.definition.name.len() > 100 {
885 return Err(P2PError::MCP("Invalid tool name".to_string()).into());
886 }
887
888 if !tool.definition.input_schema.is_object() {
890 return Err(P2PError::MCP("Tool input schema must be an object".to_string()).into());
891 }
892
893 Ok(())
894 }
895
896 async fn register_tool_in_dht(&self, tool_name: &str, dht: &Arc<RwLock<DHT>>) -> Result<()> {
898 let key = Key::new(format!("mcp:tool:{}", tool_name).as_bytes());
899 let service_info = json!({
900 "tool_name": tool_name,
901 "node_id": "local_node", "registered_at": SystemTime::now().duration_since(std::time::UNIX_EPOCH).map_err(|e| P2PError::Network(format!("Time error: {}", e)))?.as_secs(),
903 "capabilities": self.get_server_capabilities().await
904 });
905
906 let dht_guard = dht.read().await;
907 dht_guard.put(key, serde_json::to_vec(&service_info)?).await?;
908
909 Ok(())
910 }
911
912 async fn get_server_capabilities(&self) -> MCPCapabilities {
914 MCPCapabilities {
915 experimental: None,
916 sampling: None,
917 tools: Some(MCPToolsCapability {
918 list_changed: Some(true),
919 }),
920 prompts: Some(MCPPromptsCapability {
921 list_changed: Some(true),
922 }),
923 resources: Some(MCPResourcesCapability {
924 subscribe: Some(true),
925 list_changed: Some(true),
926 }),
927 logging: Some(MCPLoggingCapability {
928 levels: Some(vec![
929 MCPLogLevel::Debug,
930 MCPLogLevel::Info,
931 MCPLogLevel::Warning,
932 MCPLogLevel::Error,
933 ]),
934 }),
935 }
936 }
937
938 pub async fn call_tool(&self, tool_name: &str, arguments: Value, context: MCPCallContext) -> Result<Value> {
940 let start_time = Instant::now();
941
942 if !self.check_rate_limit(&context.caller_id).await? {
946 return Err(P2PError::MCP("Rate limit exceeded".to_string()));
947 }
948
949 if !self.check_permission(&context.caller_id, &MCPPermission::ExecuteTools).await? {
951 return Err(P2PError::MCP("Permission denied: execute tools".to_string()));
952 }
953
954 let tool_security_level = self.get_tool_security_policy(tool_name).await;
956 let is_trusted = self.is_trusted_peer(&context.caller_id).await;
957
958 match tool_security_level {
959 SecurityLevel::Admin => {
960 if !self.check_permission(&context.caller_id, &MCPPermission::Admin).await? {
961 return Err(P2PError::MCP("Permission denied: admin access required".to_string()));
962 }
963 }
964 SecurityLevel::Strong => {
965 if !is_trusted {
966 return Err(P2PError::MCP("Permission denied: trusted peer required".to_string()));
967 }
968 }
969 SecurityLevel::Basic => {
970 if self.config.enable_auth {
972 if let Some(auth_info) = &context.auth_info {
973 self.verify_auth_token(&auth_info.token).await?;
974 } else {
975 return Err(P2PError::MCP("Authentication required".to_string()));
976 }
977 }
978 }
979 SecurityLevel::Public => {
980 }
982 }
983
984 let mut details = HashMap::new();
986 details.insert("action".to_string(), "tool_call".to_string());
987 details.insert("tool_name".to_string(), tool_name.to_string());
988 details.insert("security_level".to_string(), format!("{:?}", tool_security_level));
989
990 self.audit_logger.log_event(
991 "tool_execution".to_string(),
992 context.caller_id.clone(),
993 details,
994 AuditSeverity::Info,
995 ).await;
996
997 let tool_exists = {
999 let tools = self.tools.read().await;
1000 tools.contains_key(tool_name)
1001 };
1002
1003 if !tool_exists {
1004 return Err(P2PError::MCP(format!("Tool not found: {}", tool_name)).into());
1005 }
1006
1007 let requirements = {
1009 let tools = self.tools.read().await;
1010 let tool = tools.get(tool_name).unwrap(); if let Err(e) = tool.handler.validate(&arguments) {
1014 return Err(P2PError::MCP(format!("Tool validation failed: {}", e)).into());
1015 }
1016
1017 tool.handler.get_requirements()
1019 };
1020
1021 self.check_resource_requirements(&requirements).await?;
1023
1024 let tools_clone = self.tools.clone();
1026 let tool_name_owned = tool_name.to_string();
1027 let execution_timeout = context.timeout.min(requirements.max_execution_time.unwrap_or(context.timeout));
1028
1029 let result = timeout(execution_timeout, async move {
1030 let tools = tools_clone.read().await;
1031 let tool = tools.get(&tool_name_owned).unwrap(); tool.handler.execute(arguments).await
1033 }).await
1034 .map_err(|_| P2PError::MCP("Tool execution timeout".to_string()))?
1035 .map_err(|e| P2PError::MCP(format!("Tool execution failed: {}", e)))?;
1036
1037 let execution_time = start_time.elapsed();
1038
1039 self.update_tool_stats(tool_name, execution_time, true).await;
1041
1042 {
1044 let mut stats = self.stats.write().await;
1045 stats.total_requests += 1;
1046 stats.total_responses += 1;
1047
1048 let new_total_time = stats.avg_response_time.mul_f64(stats.total_responses as f64 - 1.0) + execution_time;
1050 stats.avg_response_time = new_total_time.div_f64(stats.total_responses as f64);
1051
1052 *stats.popular_tools.entry(tool_name.to_string()).or_insert(0) += 1;
1054 }
1055
1056 debug!("Tool '{}' executed in {:?}", tool_name, execution_time);
1057 Ok(result)
1058 }
1059
1060 async fn check_resource_requirements(&self, requirements: &ToolRequirements) -> Result<()> {
1062 if let Some(max_memory) = requirements.max_memory {
1064 if max_memory > self.config.tool_memory_limit {
1065 return Err(P2PError::MCP("Tool memory requirement exceeds limit".to_string()).into());
1066 }
1067 }
1068
1069 if let Some(max_execution_time) = requirements.max_execution_time {
1071 if max_execution_time > self.config.max_tool_execution_time {
1072 return Err(P2PError::MCP("Tool execution time requirement exceeds limit".to_string()).into());
1073 }
1074 }
1075
1076 Ok(())
1079 }
1080
1081 async fn update_tool_stats(&self, tool_name: &str, execution_time: Duration, success: bool) {
1083 let mut tools = self.tools.write().await;
1084 if let Some(tool) = tools.get_mut(tool_name) {
1085 tool.metadata.call_count += 1;
1086 tool.metadata.last_called = Some(SystemTime::now());
1087
1088 let new_total_time = tool.metadata.avg_execution_time.mul_f64(tool.metadata.call_count as f64 - 1.0) + execution_time;
1090 tool.metadata.avg_execution_time = new_total_time.div_f64(tool.metadata.call_count as f64);
1091
1092 if !success {
1094 tool.metadata.health_status = match tool.metadata.health_status {
1095 ToolHealthStatus::Healthy => ToolHealthStatus::Degraded,
1096 ToolHealthStatus::Degraded => ToolHealthStatus::Unhealthy,
1097 other => other,
1098 };
1099 } else if tool.metadata.health_status != ToolHealthStatus::Disabled {
1100 tool.metadata.health_status = ToolHealthStatus::Healthy;
1101 }
1102 }
1103 }
1104
1105 pub async fn list_tools(&self, _cursor: Option<String>) -> Result<(Vec<MCPTool>, Option<String>)> {
1107 let tools = self.tools.read().await;
1108 let tool_definitions: Vec<MCPTool> = tools.values()
1109 .map(|tool| tool.definition.clone())
1110 .collect();
1111
1112 Ok((tool_definitions, None))
1115 }
1116
1117 async fn start_request_processor(&self) -> Result<()> {
1119 let _request_tx = self.request_tx.clone();
1120 let _server_clone = Arc::new(self);
1121
1122 tokio::spawn(async move {
1123 info!("MCP request processor started");
1124
1125 loop {
1130 tokio::time::sleep(Duration::from_millis(100)).await;
1133
1134 break;
1137 }
1138
1139 info!("MCP request processor stopped");
1140 });
1141
1142 Ok(())
1143 }
1144
1145 async fn start_service_discovery(&self) -> Result<()> {
1147 if let Some(dht) = self.dht.clone() {
1148 let _stats = self.stats.clone();
1149 let remote_services = self.remote_services.clone();
1150
1151 tokio::spawn(async move {
1152 info!("MCP service discovery started");
1153
1154 loop {
1155 tokio::time::sleep(SERVICE_DISCOVERY_INTERVAL).await;
1157
1158 let key = Key::new(b"mcp:services");
1160 let dht_guard = dht.read().await;
1161
1162 match dht_guard.get(&key).await {
1163 Some(record) => {
1164 match serde_json::from_slice::<Vec<MCPService>>(&record.value) {
1165 Ok(services) => {
1166 debug!("Discovered {} MCP services", services.len());
1167
1168 {
1170 let mut remote_cache = remote_services.write().await;
1171 for service in services {
1172 remote_cache.insert(service.service_id.clone(), service);
1173 }
1174 }
1175 }
1176 Err(e) => {
1177 debug!("Failed to deserialize services: {}", e);
1178 }
1179 }
1180 }
1181 None => {
1182 debug!("No MCP services found in DHT");
1183 }
1184 }
1185 }
1186 });
1187 }
1188
1189 Ok(())
1190 }
1191
1192 async fn start_health_monitor(&self) -> Result<()> {
1194 Ok(())
1197 }
1198
1199 pub async fn get_stats(&self) -> MCPServerStats {
1201 self.stats.read().await.clone()
1202 }
1203
1204 pub async fn discover_remote_services(&self) -> Result<Vec<MCPService>> {
1206 if let Some(dht) = &self.dht {
1207 let key = Key::new(b"mcp:services");
1208 let dht_guard = dht.read().await;
1209
1210 match dht_guard.get(&key).await {
1211 Some(record) => {
1212 match serde_json::from_slice::<Vec<MCPService>>(&record.value) {
1213 Ok(services) => {
1214 {
1216 let mut remote_services = self.remote_services.write().await;
1217 for service in &services {
1218 remote_services.insert(service.service_id.clone(), service.clone());
1219 }
1220 }
1221 Ok(services)
1222 }
1223 Err(e) => {
1224 debug!("Failed to deserialize services: {}", e);
1225 Ok(Vec::new())
1226 }
1227 }
1228 }
1229 None => Ok(Vec::new()),
1230 }
1231 } else {
1232 Ok(Vec::new())
1233 }
1234 }
1235
1236 pub async fn call_remote_tool(&self, peer_id: &PeerId, tool_name: &str, arguments: Value, context: MCPCallContext) -> Result<Value> {
1238 let request_id = uuid::Uuid::new_v4().to_string();
1239
1240 let mcp_message = MCPMessage::CallTool {
1242 name: tool_name.to_string(),
1243 arguments,
1244 };
1245
1246 let p2p_message = P2PMCPMessage {
1248 message_type: P2PMCPMessageType::Request,
1249 message_id: request_id.clone(),
1250 source_peer: context.caller_id.clone(),
1251 target_peer: Some(peer_id.clone()),
1252 timestamp: SystemTime::now()
1253 .duration_since(std::time::UNIX_EPOCH)
1254 .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1255 .as_secs(),
1256 payload: mcp_message,
1257 ttl: 5, };
1259
1260 let message_data = serde_json::to_vec(&p2p_message)
1262 .map_err(|e| P2PError::Serialization(e))?;
1263
1264 if message_data.len() > MAX_MESSAGE_SIZE {
1265 return Err(P2PError::MCP("Message too large".to_string()));
1266 }
1267
1268 let (response_tx, _response_rx) = oneshot::channel::<MCPResponse>();
1270
1271 {
1273 let mut handlers = self.request_handlers.write().await;
1274 handlers.insert(request_id.clone(), response_tx);
1275 }
1276
1277 Err(P2PError::MCP("Remote tool calling requires P2P network integration".to_string()))
1280 }
1281
1282 pub async fn handle_p2p_message(&self, message_data: &[u8], source_peer: &PeerId) -> Result<Option<Vec<u8>>> {
1284 let p2p_message: P2PMCPMessage = serde_json::from_slice(message_data)
1286 .map_err(|e| P2PError::Serialization(e))?;
1287
1288 debug!("Received MCP message from {}: {:?}", source_peer, p2p_message.message_type);
1289
1290 match p2p_message.message_type {
1291 P2PMCPMessageType::Request => {
1292 self.handle_remote_request(p2p_message).await
1293 }
1294 P2PMCPMessageType::Response => {
1295 self.handle_remote_response(p2p_message).await?;
1296 Ok(None) }
1298 P2PMCPMessageType::ServiceAdvertisement => {
1299 self.handle_service_advertisement(p2p_message).await?;
1300 Ok(None)
1301 }
1302 P2PMCPMessageType::ServiceDiscovery => {
1303 self.handle_service_discovery(p2p_message).await
1304 }
1305 }
1306 }
1307
1308 async fn handle_remote_request(&self, message: P2PMCPMessage) -> Result<Option<Vec<u8>>> {
1310 match message.payload {
1311 MCPMessage::CallTool { name, arguments } => {
1312 let context = MCPCallContext {
1313 caller_id: message.source_peer.clone(),
1314 timestamp: SystemTime::now(),
1315 timeout: DEFAULT_CALL_TIMEOUT,
1316 auth_info: None,
1317 metadata: HashMap::new(),
1318 };
1319
1320 let result = self.call_tool(&name, arguments, context).await;
1322
1323 let response_payload = match result {
1325 Ok(value) => MCPMessage::CallToolResult {
1326 content: vec![MCPContent::Text { text: value.to_string() }],
1327 is_error: false,
1328 },
1329 Err(e) => MCPMessage::Error {
1330 code: -1,
1331 message: e.to_string(),
1332 data: None,
1333 },
1334 };
1335
1336 let response_message = P2PMCPMessage {
1337 message_type: P2PMCPMessageType::Response,
1338 message_id: message.message_id,
1339 source_peer: "local".to_string(), target_peer: Some(message.source_peer),
1341 timestamp: SystemTime::now()
1342 .duration_since(std::time::UNIX_EPOCH)
1343 .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1344 .as_secs(),
1345 payload: response_payload,
1346 ttl: message.ttl.saturating_sub(1),
1347 };
1348
1349 let response_data = serde_json::to_vec(&response_message)
1351 .map_err(|e| P2PError::Serialization(e))?;
1352
1353 Ok(Some(response_data))
1354 }
1355 MCPMessage::ListTools { cursor: _ } => {
1356 let (tools, _) = self.list_tools(None).await?;
1357
1358 let response_payload = MCPMessage::ListToolsResult {
1359 tools,
1360 next_cursor: None,
1361 };
1362
1363 let response_message = P2PMCPMessage {
1364 message_type: P2PMCPMessageType::Response,
1365 message_id: message.message_id,
1366 source_peer: "local".to_string(), target_peer: Some(message.source_peer),
1368 timestamp: SystemTime::now()
1369 .duration_since(std::time::UNIX_EPOCH)
1370 .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1371 .as_secs(),
1372 payload: response_payload,
1373 ttl: message.ttl.saturating_sub(1),
1374 };
1375
1376 let response_data = serde_json::to_vec(&response_message)
1377 .map_err(|e| P2PError::Serialization(e))?;
1378
1379 Ok(Some(response_data))
1380 }
1381 _ => {
1382 let error_response = P2PMCPMessage {
1384 message_type: P2PMCPMessageType::Response,
1385 message_id: message.message_id,
1386 source_peer: "local".to_string(), target_peer: Some(message.source_peer),
1388 timestamp: SystemTime::now()
1389 .duration_since(std::time::UNIX_EPOCH)
1390 .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1391 .as_secs(),
1392 payload: MCPMessage::Error {
1393 code: -2,
1394 message: "Unsupported request type".to_string(),
1395 data: None,
1396 },
1397 ttl: message.ttl.saturating_sub(1),
1398 };
1399
1400 let response_data = serde_json::to_vec(&error_response)
1401 .map_err(|e| P2PError::Serialization(e))?;
1402
1403 Ok(Some(response_data))
1404 }
1405 }
1406 }
1407
1408 pub async fn generate_auth_token(&self, peer_id: &PeerId, permissions: Vec<MCPPermission>, ttl: Duration) -> Result<String> {
1412 if let Some(security_manager) = &self.security_manager {
1413 let token = security_manager.generate_token(peer_id, permissions, ttl).await?;
1414
1415 let mut details = HashMap::new();
1417 details.insert("action".to_string(), "token_generated".to_string());
1418 details.insert("ttl_seconds".to_string(), ttl.as_secs().to_string());
1419
1420 self.audit_logger.log_event(
1421 "authentication".to_string(),
1422 peer_id.clone(),
1423 details,
1424 AuditSeverity::Info,
1425 ).await;
1426
1427 Ok(token)
1428 } else {
1429 Err(P2PError::MCP("Authentication not enabled".to_string()))
1430 }
1431 }
1432
1433 pub async fn verify_auth_token(&self, token: &str) -> Result<TokenPayload> {
1435 if let Some(security_manager) = &self.security_manager {
1436 match security_manager.verify_token(token).await {
1437 Ok(payload) => {
1438 let mut details = HashMap::new();
1440 details.insert("action".to_string(), "token_verified".to_string());
1441 details.insert("subject".to_string(), payload.sub.clone());
1442
1443 self.audit_logger.log_event(
1444 "authentication".to_string(),
1445 payload.iss.clone(),
1446 details,
1447 AuditSeverity::Info,
1448 ).await;
1449
1450 Ok(payload)
1451 }
1452 Err(e) => {
1453 let mut details = HashMap::new();
1455 details.insert("action".to_string(), "token_verification_failed".to_string());
1456 details.insert("error".to_string(), e.to_string());
1457
1458 self.audit_logger.log_event(
1459 "authentication".to_string(),
1460 "unknown".to_string(),
1461 details,
1462 AuditSeverity::Warning,
1463 ).await;
1464
1465 Err(e)
1466 }
1467 }
1468 } else {
1469 Err(P2PError::MCP("Authentication not enabled".to_string()))
1470 }
1471 }
1472
1473 pub async fn check_permission(&self, peer_id: &PeerId, permission: &MCPPermission) -> Result<bool> {
1475 if let Some(security_manager) = &self.security_manager {
1476 security_manager.check_permission(peer_id, permission).await
1477 } else {
1478 Ok(true)
1480 }
1481 }
1482
1483 pub async fn check_rate_limit(&self, peer_id: &PeerId) -> Result<bool> {
1485 if let Some(security_manager) = &self.security_manager {
1486 let allowed = security_manager.check_rate_limit(peer_id).await?;
1487
1488 if !allowed {
1489 let mut details = HashMap::new();
1491 details.insert("action".to_string(), "rate_limit_exceeded".to_string());
1492
1493 self.audit_logger.log_event(
1494 "rate_limiting".to_string(),
1495 peer_id.clone(),
1496 details,
1497 AuditSeverity::Warning,
1498 ).await;
1499 }
1500
1501 Ok(allowed)
1502 } else {
1503 Ok(true)
1505 }
1506 }
1507
1508 pub async fn grant_permission(&self, peer_id: &PeerId, permission: MCPPermission) -> Result<()> {
1510 if let Some(security_manager) = &self.security_manager {
1511 security_manager.grant_permission(peer_id, permission.clone()).await?;
1512
1513 let mut details = HashMap::new();
1515 details.insert("action".to_string(), "permission_granted".to_string());
1516 details.insert("permission".to_string(), permission.as_str().to_string());
1517
1518 self.audit_logger.log_event(
1519 "authorization".to_string(),
1520 peer_id.clone(),
1521 details,
1522 AuditSeverity::Info,
1523 ).await;
1524
1525 Ok(())
1526 } else {
1527 Err(P2PError::MCP("Security not enabled".to_string()))
1528 }
1529 }
1530
1531 pub async fn revoke_permission(&self, peer_id: &PeerId, permission: &MCPPermission) -> Result<()> {
1533 if let Some(security_manager) = &self.security_manager {
1534 security_manager.revoke_permission(peer_id, permission).await?;
1535
1536 let mut details = HashMap::new();
1538 details.insert("action".to_string(), "permission_revoked".to_string());
1539 details.insert("permission".to_string(), permission.as_str().to_string());
1540
1541 self.audit_logger.log_event(
1542 "authorization".to_string(),
1543 peer_id.clone(),
1544 details,
1545 AuditSeverity::Info,
1546 ).await;
1547
1548 Ok(())
1549 } else {
1550 Err(P2PError::MCP("Security not enabled".to_string()))
1551 }
1552 }
1553
1554 pub async fn add_trusted_peer(&self, peer_id: PeerId) -> Result<()> {
1556 if let Some(security_manager) = &self.security_manager {
1557 security_manager.add_trusted_peer(peer_id.clone()).await?;
1558
1559 let mut details = HashMap::new();
1561 details.insert("action".to_string(), "trusted_peer_added".to_string());
1562
1563 self.audit_logger.log_event(
1564 "trust_management".to_string(),
1565 peer_id,
1566 details,
1567 AuditSeverity::Info,
1568 ).await;
1569
1570 Ok(())
1571 } else {
1572 Err(P2PError::MCP("Security not enabled".to_string()))
1573 }
1574 }
1575
1576 pub async fn is_trusted_peer(&self, peer_id: &PeerId) -> bool {
1578 if let Some(security_manager) = &self.security_manager {
1579 security_manager.is_trusted_peer(peer_id).await
1580 } else {
1581 false
1582 }
1583 }
1584
1585 pub async fn set_tool_security_policy(&self, tool_name: String, level: SecurityLevel) -> Result<()> {
1587 if let Some(security_manager) = &self.security_manager {
1588 security_manager.set_tool_policy(tool_name.clone(), level.clone()).await?;
1589
1590 let mut details = HashMap::new();
1592 details.insert("action".to_string(), "tool_policy_set".to_string());
1593 details.insert("tool_name".to_string(), tool_name);
1594 details.insert("security_level".to_string(), format!("{:?}", level));
1595
1596 self.audit_logger.log_event(
1597 "security_policy".to_string(),
1598 "system".to_string(),
1599 details,
1600 AuditSeverity::Info,
1601 ).await;
1602
1603 Ok(())
1604 } else {
1605 Err(P2PError::MCP("Security not enabled".to_string()))
1606 }
1607 }
1608
1609 pub async fn get_tool_security_policy(&self, tool_name: &str) -> SecurityLevel {
1611 if let Some(security_manager) = &self.security_manager {
1612 security_manager.get_tool_policy(tool_name).await
1613 } else {
1614 SecurityLevel::Public
1615 }
1616 }
1617
1618 pub async fn get_peer_security_stats(&self, peer_id: &PeerId) -> Option<PeerACL> {
1620 if let Some(security_manager) = &self.security_manager {
1621 security_manager.get_peer_stats(peer_id).await
1622 } else {
1623 None
1624 }
1625 }
1626
1627 pub async fn get_security_audit(&self, limit: Option<usize>) -> Vec<SecurityAuditEntry> {
1629 self.audit_logger.get_recent_entries(limit).await
1630 }
1631
1632 pub async fn security_cleanup(&self) -> Result<()> {
1634 if let Some(security_manager) = &self.security_manager {
1635 security_manager.cleanup().await?;
1636 }
1637 Ok(())
1638 }
1639
1640 async fn handle_remote_response(&self, message: P2PMCPMessage) -> Result<()> {
1642 let response_tx = {
1644 let mut handlers = self.request_handlers.write().await;
1645 handlers.remove(&message.message_id)
1646 };
1647
1648 if let Some(tx) = response_tx {
1649 let response = MCPResponse {
1650 request_id: message.message_id,
1651 message: message.payload,
1652 timestamp: SystemTime::now(),
1653 processing_time: Duration::from_millis(0), };
1655
1656 let _ = tx.send(response);
1658 } else {
1659 debug!("Received response for unknown request: {}", message.message_id);
1660 }
1661
1662 Ok(())
1663 }
1664
1665 async fn handle_service_advertisement(&self, _message: P2PMCPMessage) -> Result<()> {
1667 Ok(())
1669 }
1670
1671 async fn handle_service_discovery(&self, message: P2PMCPMessage) -> Result<Option<Vec<u8>>> {
1673 let local_services: Vec<MCPService> = {
1675 let services = self.local_services.read().await;
1676 services.values().cloned().collect()
1677 };
1678
1679 if !local_services.is_empty() {
1680 let advertisement = P2PMCPMessage {
1681 message_type: P2PMCPMessageType::ServiceAdvertisement,
1682 message_id: uuid::Uuid::new_v4().to_string(),
1683 source_peer: "local".to_string(), target_peer: Some(message.source_peer),
1685 timestamp: SystemTime::now()
1686 .duration_since(std::time::UNIX_EPOCH)
1687 .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1688 .as_secs(),
1689 payload: MCPMessage::ListToolsResult {
1690 tools: local_services.into_iter()
1691 .flat_map(|s| s.tools.into_iter().map(|t| MCPTool {
1692 name: t,
1693 description: "Remote tool".to_string(),
1694 input_schema: json!({"type": "object"}),
1695 }))
1696 .collect(),
1697 next_cursor: None,
1698 },
1699 ttl: message.ttl.saturating_sub(1),
1700 };
1701
1702 let response_data = serde_json::to_vec(&advertisement)
1703 .map_err(|e| P2PError::Serialization(e))?;
1704
1705 Ok(Some(response_data))
1706 } else {
1707 Ok(None)
1708 }
1709 }
1710
1711 pub async fn shutdown(&self) -> Result<()> {
1713 info!("Shutting down MCP server");
1714
1715 {
1717 let mut sessions = self.sessions.write().await;
1718 for session in sessions.values_mut() {
1719 session.state = MCPSessionState::Terminated;
1720 }
1721 sessions.clear();
1722 }
1723
1724 info!("MCP server shutdown complete");
1727 Ok(())
1728 }
1729}
1730
1731impl Tool {
1732 pub fn new(name: &str, description: &str, input_schema: Value) -> ToolBuilder {
1734 ToolBuilder {
1735 name: name.to_string(),
1736 description: description.to_string(),
1737 input_schema,
1738 handler: None,
1739 tags: Vec::new(),
1740 }
1741 }
1742}
1743
1744pub struct ToolBuilder {
1746 name: String,
1747 description: String,
1748 input_schema: Value,
1749 handler: Option<Box<dyn ToolHandler + Send + Sync>>,
1750 tags: Vec<String>,
1751}
1752
1753impl ToolBuilder {
1754 pub fn handler<H: ToolHandler + Send + Sync + 'static>(mut self, handler: H) -> Self {
1756 self.handler = Some(Box::new(handler));
1757 self
1758 }
1759
1760 pub fn tags(mut self, tags: Vec<String>) -> Self {
1762 self.tags = tags;
1763 self
1764 }
1765
1766 pub fn build(self) -> Result<Tool> {
1768 let handler = self.handler
1769 .ok_or_else(|| P2PError::MCP("Tool handler is required".to_string()))?;
1770
1771 let definition = MCPTool {
1772 name: self.name,
1773 description: self.description,
1774 input_schema: self.input_schema,
1775 };
1776
1777 let metadata = ToolMetadata {
1778 created_at: SystemTime::now(),
1779 last_called: None,
1780 call_count: 0,
1781 avg_execution_time: Duration::from_millis(0),
1782 health_status: ToolHealthStatus::Healthy,
1783 tags: self.tags,
1784 };
1785
1786 Ok(Tool {
1787 definition,
1788 handler,
1789 metadata,
1790 })
1791 }
1792}
1793
1794pub struct FunctionToolHandler<F> {
1796 function: F,
1797}
1798
1799impl<F, Fut> ToolHandler for FunctionToolHandler<F>
1800where
1801 F: Fn(Value) -> Fut + Send + Sync,
1802 Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
1803{
1804 fn execute(&self, arguments: Value) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>> {
1805 Box::pin((self.function)(arguments))
1806 }
1807}
1808
1809impl<F> FunctionToolHandler<F> {
1810 pub fn new(function: F) -> Self {
1812 Self { function }
1813 }
1814}
1815
1816impl MCPService {
1818 pub fn new(service_id: String, node_id: PeerId) -> Self {
1820 Self {
1821 service_id,
1822 node_id,
1823 tools: Vec::new(),
1824 capabilities: MCPCapabilities {
1825 experimental: None,
1826 sampling: None,
1827 tools: Some(MCPToolsCapability {
1828 list_changed: Some(true),
1829 }),
1830 prompts: None,
1831 resources: None,
1832 logging: None,
1833 },
1834 metadata: MCPServiceMetadata {
1835 name: "MCP Service".to_string(),
1836 version: "1.0.0".to_string(),
1837 description: None,
1838 tags: Vec::new(),
1839 health_status: ServiceHealthStatus::Healthy,
1840 load_metrics: ServiceLoadMetrics {
1841 active_requests: 0,
1842 requests_per_second: 0.0,
1843 avg_response_time_ms: 0.0,
1844 error_rate: 0.0,
1845 cpu_usage: 0.0,
1846 memory_usage: 0,
1847 },
1848 },
1849 registered_at: SystemTime::now(),
1850 endpoint: MCPEndpoint {
1851 protocol: "p2p".to_string(),
1852 address: "".to_string(),
1853 port: None,
1854 tls: false,
1855 auth_required: false,
1856 },
1857 }
1858 }
1859}
1860
1861impl Default for MCPCapabilities {
1862 fn default() -> Self {
1863 Self {
1864 experimental: None,
1865 sampling: None,
1866 tools: Some(MCPToolsCapability {
1867 list_changed: Some(true),
1868 }),
1869 prompts: Some(MCPPromptsCapability {
1870 list_changed: Some(true),
1871 }),
1872 resources: Some(MCPResourcesCapability {
1873 subscribe: Some(true),
1874 list_changed: Some(true),
1875 }),
1876 logging: Some(MCPLoggingCapability {
1877 levels: Some(vec![
1878 MCPLogLevel::Debug,
1879 MCPLogLevel::Info,
1880 MCPLogLevel::Warning,
1881 MCPLogLevel::Error,
1882 ]),
1883 }),
1884 }
1885 }
1886}
1887
1888#[cfg(test)]
1889mod tests {
1890 use super::*;
1891 use crate::dht::{DHT, DHTConfig, Key};
1892 use std::pin::Pin;
1893 use std::future::Future;
1894 use tokio::time::timeout;
1895
1896 struct TestTool {
1898 name: String,
1899 should_error: bool,
1900 execution_time: Duration,
1901 }
1902
1903 impl TestTool {
1904 fn new(name: &str) -> Self {
1905 Self {
1906 name: name.to_string(),
1907 should_error: false,
1908 execution_time: Duration::from_millis(10),
1909 }
1910 }
1911
1912 fn with_error(mut self) -> Self {
1913 self.should_error = true;
1914 self
1915 }
1916
1917 fn with_execution_time(mut self, duration: Duration) -> Self {
1918 self.execution_time = duration;
1919 self
1920 }
1921 }
1922
1923 impl ToolHandler for TestTool {
1924 fn execute(&self, arguments: Value) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>> {
1925 let should_error = self.should_error;
1926 let execution_time = self.execution_time;
1927 let name = self.name.clone();
1928
1929 Box::pin(async move {
1930 tokio::time::sleep(execution_time).await;
1931
1932 if should_error {
1933 return Err(P2PError::MCP(format!("Test error from tool {}", name)).into());
1934 }
1935
1936 Ok(json!({
1938 "tool": name,
1939 "arguments": arguments,
1940 "result": "success"
1941 }))
1942 })
1943 }
1944
1945 fn validate(&self, arguments: &Value) -> Result<()> {
1946 if !arguments.is_object() {
1947 return Err(P2PError::MCP("Arguments must be an object".to_string()).into());
1948 }
1949 Ok(())
1950 }
1951
1952 fn get_requirements(&self) -> ToolRequirements {
1953 ToolRequirements {
1954 max_memory: Some(1024 * 1024), max_execution_time: Some(Duration::from_secs(5)),
1956 required_capabilities: vec!["test".to_string()],
1957 requires_network: false,
1958 requires_filesystem: false,
1959 }
1960 }
1961 }
1962
1963 async fn create_test_mcp_server() -> MCPServer {
1965 let config = MCPServerConfig {
1966 server_name: "test_server".to_string(),
1967 server_version: "1.0.0".to_string(),
1968 enable_auth: false,
1969 enable_rate_limiting: false,
1970 max_concurrent_requests: 10,
1971 request_timeout: Duration::from_secs(30),
1972 enable_dht_discovery: true,
1973 rate_limit_rpm: 60,
1974 enable_logging: true,
1975 max_tool_execution_time: Duration::from_secs(30),
1976 tool_memory_limit: 100 * 1024 * 1024,
1977 };
1978
1979 MCPServer::new(config)
1980 }
1981
1982 fn create_test_tool(name: &str) -> Tool {
1984 Tool {
1985 definition: MCPTool {
1986 name: name.to_string(),
1987 description: format!("Test tool: {}", name),
1988 input_schema: json!({
1989 "type": "object",
1990 "properties": {
1991 "input": { "type": "string" }
1992 }
1993 }),
1994 },
1995 handler: Box::new(TestTool::new(name)),
1996 metadata: ToolMetadata {
1997 created_at: SystemTime::now(),
1998 last_called: None,
1999 call_count: 0,
2000 avg_execution_time: Duration::from_millis(0),
2001 health_status: ToolHealthStatus::Healthy,
2002 tags: vec!["test".to_string()],
2003 },
2004 }
2005 }
2006
2007 async fn create_test_dht() -> DHT {
2009 let local_id = Key::new(b"test_node_id");
2010 let config = DHTConfig::default();
2011 DHT::new(local_id, config)
2012 }
2013
2014 fn create_test_context(caller_id: PeerId) -> MCPCallContext {
2016 MCPCallContext {
2017 caller_id,
2018 timestamp: SystemTime::now(),
2019 timeout: Duration::from_secs(30),
2020 auth_info: None,
2021 metadata: HashMap::new(),
2022 }
2023 }
2024
2025 #[tokio::test]
2026 async fn test_mcp_server_creation() {
2027 let server = create_test_mcp_server().await;
2028 assert_eq!(server.config.server_name, "test_server");
2029 assert_eq!(server.config.server_version, "1.0.0");
2030 assert!(!server.config.enable_auth);
2031 assert!(!server.config.enable_rate_limiting);
2032 }
2033
2034 #[tokio::test]
2035 async fn test_tool_registration() -> Result<()> {
2036 let server = create_test_mcp_server().await;
2037 let tool = create_test_tool("test_calculator");
2038
2039 server.register_tool(tool).await?;
2041
2042 let tools = server.tools.read().await;
2044 assert!(tools.contains_key("test_calculator"));
2045 assert_eq!(tools.get("test_calculator").unwrap().definition.name, "test_calculator");
2046
2047 let stats = server.stats.read().await;
2049 assert_eq!(stats.total_tools, 1);
2050
2051 Ok(())
2052 }
2053
2054 #[tokio::test]
2055 async fn test_tool_registration_duplicate() -> Result<()> {
2056 let server = create_test_mcp_server().await;
2057 let tool1 = create_test_tool("duplicate_tool");
2058 let tool2 = create_test_tool("duplicate_tool");
2059
2060 server.register_tool(tool1).await?;
2062
2063 let result = server.register_tool(tool2).await;
2065 assert!(result.is_err());
2066 assert!(result.unwrap_err().to_string().contains("Tool already exists"));
2067
2068 Ok(())
2069 }
2070
2071 #[tokio::test]
2072 async fn test_tool_validation() {
2073 let server = create_test_mcp_server().await;
2074
2075 let mut invalid_tool = create_test_tool("");
2077 let result = server.validate_tool(&invalid_tool).await;
2078 assert!(result.is_err());
2079
2080 invalid_tool.definition.name = "a".repeat(200);
2082 let result = server.validate_tool(&invalid_tool).await;
2083 assert!(result.is_err());
2084
2085 let mut invalid_schema_tool = create_test_tool("valid_name");
2087 invalid_schema_tool.definition.input_schema = json!("not an object");
2088 let result = server.validate_tool(&invalid_schema_tool).await;
2089 assert!(result.is_err());
2090
2091 let valid_tool = create_test_tool("valid_tool");
2093 let result = server.validate_tool(&valid_tool).await;
2094 assert!(result.is_ok());
2095 }
2096
2097 #[tokio::test]
2098 async fn test_tool_call_success() -> Result<()> {
2099 let server = create_test_mcp_server().await;
2100 let tool = create_test_tool("success_tool");
2101 server.register_tool(tool).await?;
2102
2103 let caller_id = "test_peer_123".to_string();
2104 let context = create_test_context(caller_id);
2105 let arguments = json!({"input": "test data"});
2106
2107 let result = server.call_tool("success_tool", arguments.clone(), context).await?;
2108
2109 assert_eq!(result["tool"], "success_tool");
2111 assert_eq!(result["arguments"], arguments);
2112 assert_eq!(result["result"], "success");
2113
2114 let tools = server.tools.read().await;
2116 let tool_metadata = &tools.get("success_tool").unwrap().metadata;
2117 assert_eq!(tool_metadata.call_count, 1);
2118 assert!(tool_metadata.last_called.is_some());
2119
2120 Ok(())
2121 }
2122
2123 #[tokio::test]
2124 async fn test_tool_call_nonexistent() -> Result<()> {
2125 let server = create_test_mcp_server().await;
2126 let caller_id = "test_peer_456".to_string();
2127 let context = create_test_context(caller_id);
2128 let arguments = json!({"input": "test"});
2129
2130 let result = server.call_tool("nonexistent_tool", arguments, context).await;
2131 assert!(result.is_err());
2132 assert!(result.unwrap_err().to_string().contains("Tool not found"));
2133
2134 Ok(())
2135 }
2136
2137 #[tokio::test]
2138 async fn test_tool_call_handler_error() -> Result<()> {
2139 let server = create_test_mcp_server().await;
2140 let tool = Tool {
2141 definition: MCPTool {
2142 name: "error_tool".to_string(),
2143 description: "Tool that always errors".to_string(),
2144 input_schema: json!({"type": "object"}),
2145 },
2146 handler: Box::new(TestTool::new("error_tool").with_error()),
2147 metadata: ToolMetadata {
2148 created_at: SystemTime::now(),
2149 last_called: None,
2150 call_count: 0,
2151 avg_execution_time: Duration::from_millis(0),
2152 health_status: ToolHealthStatus::Healthy,
2153 tags: vec![],
2154 },
2155 };
2156
2157 server.register_tool(tool).await?;
2158
2159 let caller_id = "test_peer_error".to_string();
2160 let context = create_test_context(caller_id);
2161 let arguments = json!({"input": "test"});
2162
2163 let result = server.call_tool("error_tool", arguments, context).await;
2164 assert!(result.is_err());
2165 assert!(result.unwrap_err().to_string().contains("Test error from tool error_tool"));
2166
2167 Ok(())
2168 }
2169
2170 #[tokio::test]
2171 async fn test_tool_call_timeout() -> Result<()> {
2172 let server = create_test_mcp_server().await;
2173 let slow_tool = Tool {
2174 definition: MCPTool {
2175 name: "slow_tool".to_string(),
2176 description: "Tool that takes too long".to_string(),
2177 input_schema: json!({"type": "object"}),
2178 },
2179 handler: Box::new(TestTool::new("slow_tool").with_execution_time(Duration::from_secs(2))),
2180 metadata: ToolMetadata {
2181 created_at: SystemTime::now(),
2182 last_called: None,
2183 call_count: 0,
2184 avg_execution_time: Duration::from_millis(0),
2185 health_status: ToolHealthStatus::Healthy,
2186 tags: vec![],
2187 },
2188 };
2189
2190 server.register_tool(slow_tool).await?;
2191
2192 let caller_id = "test_peer_error".to_string();
2193 let context = create_test_context(caller_id);
2194 let arguments = json!({"input": "test"});
2195
2196 let result = timeout(
2198 Duration::from_millis(100),
2199 server.call_tool("slow_tool", arguments, context)
2200 ).await;
2201
2202 assert!(result.is_err()); Ok(())
2205 }
2206
2207 #[tokio::test]
2208 async fn test_tool_requirements() {
2209 let tool = TestTool::new("req_tool");
2210 let requirements = tool.get_requirements();
2211
2212 assert_eq!(requirements.max_memory, Some(1024 * 1024));
2213 assert_eq!(requirements.max_execution_time, Some(Duration::from_secs(5)));
2214 assert_eq!(requirements.required_capabilities, vec!["test"]);
2215 assert!(!requirements.requires_network);
2216 assert!(!requirements.requires_filesystem);
2217 }
2218
2219 #[tokio::test]
2220 async fn test_tool_validation_handler() {
2221 let tool = TestTool::new("validation_tool");
2222
2223 let valid_args = json!({"key": "value"});
2225 assert!(tool.validate(&valid_args).is_ok());
2226
2227 let invalid_args = json!("not an object");
2229 assert!(tool.validate(&invalid_args).is_err());
2230
2231 let invalid_args = json!(123);
2232 assert!(tool.validate(&invalid_args).is_err());
2233 }
2234
2235 #[tokio::test]
2236 async fn test_tool_health_status() {
2237 let mut metadata = ToolMetadata {
2238 created_at: SystemTime::now(),
2239 last_called: None,
2240 call_count: 0,
2241 avg_execution_time: Duration::from_millis(0),
2242 health_status: ToolHealthStatus::Healthy,
2243 tags: vec![],
2244 };
2245
2246 assert_eq!(metadata.health_status, ToolHealthStatus::Healthy);
2248
2249 metadata.health_status = ToolHealthStatus::Degraded;
2250 assert_eq!(metadata.health_status, ToolHealthStatus::Degraded);
2251
2252 metadata.health_status = ToolHealthStatus::Unhealthy;
2253 assert_eq!(metadata.health_status, ToolHealthStatus::Unhealthy);
2254
2255 metadata.health_status = ToolHealthStatus::Disabled;
2256 assert_eq!(metadata.health_status, ToolHealthStatus::Disabled);
2257 }
2258
2259 #[tokio::test]
2260 async fn test_mcp_capabilities() {
2261 let server = create_test_mcp_server().await;
2262 let capabilities = server.get_server_capabilities().await;
2263
2264 assert!(capabilities.tools.is_some());
2265 assert!(capabilities.prompts.is_some());
2266 assert!(capabilities.resources.is_some());
2267 assert!(capabilities.logging.is_some());
2268
2269 let tools_cap = capabilities.tools.unwrap();
2270 assert_eq!(tools_cap.list_changed, Some(true));
2271
2272 let logging_cap = capabilities.logging.unwrap();
2273 let levels = logging_cap.levels.unwrap();
2274 assert!(levels.contains(&MCPLogLevel::Debug));
2275 assert!(levels.contains(&MCPLogLevel::Info));
2276 assert!(levels.contains(&MCPLogLevel::Warning));
2277 assert!(levels.contains(&MCPLogLevel::Error));
2278 }
2279
2280 #[tokio::test]
2281 async fn test_mcp_message_serialization() {
2282 let init_msg = MCPMessage::Initialize {
2284 protocol_version: MCP_VERSION.to_string(),
2285 capabilities: MCPCapabilities {
2286 experimental: None,
2287 sampling: None,
2288 tools: Some(MCPToolsCapability { list_changed: Some(true) }),
2289 prompts: None,
2290 resources: None,
2291 logging: None,
2292 },
2293 client_info: MCPClientInfo {
2294 name: "test_client".to_string(),
2295 version: "1.0.0".to_string(),
2296 },
2297 };
2298
2299 let serialized = serde_json::to_string(&init_msg).unwrap();
2300 let deserialized: MCPMessage = serde_json::from_str(&serialized).unwrap();
2301
2302 match deserialized {
2303 MCPMessage::Initialize { protocol_version, client_info, .. } => {
2304 assert_eq!(protocol_version, MCP_VERSION);
2305 assert_eq!(client_info.name, "test_client");
2306 assert_eq!(client_info.version, "1.0.0");
2307 }
2308 _ => panic!("Wrong message type after deserialization"),
2309 }
2310 }
2311
2312 #[tokio::test]
2313 async fn test_mcp_content_types() {
2314 let text_content = MCPContent::Text {
2316 text: "Hello, world!".to_string(),
2317 };
2318
2319 let serialized = serde_json::to_string(&text_content).unwrap();
2320 let deserialized: MCPContent = serde_json::from_str(&serialized).unwrap();
2321
2322 match deserialized {
2323 MCPContent::Text { text } => assert_eq!(text, "Hello, world!"),
2324 _ => panic!("Wrong content type"),
2325 }
2326
2327 let image_content = MCPContent::Image {
2329 data: "base64data".to_string(),
2330 mime_type: "image/png".to_string(),
2331 };
2332
2333 let serialized = serde_json::to_string(&image_content).unwrap();
2334 let deserialized: MCPContent = serde_json::from_str(&serialized).unwrap();
2335
2336 match deserialized {
2337 MCPContent::Image { data, mime_type } => {
2338 assert_eq!(data, "base64data");
2339 assert_eq!(mime_type, "image/png");
2340 }
2341 _ => panic!("Wrong content type"),
2342 }
2343 }
2344
2345 #[tokio::test]
2346 async fn test_service_health_status() {
2347 let mut metrics = ServiceLoadMetrics {
2348 active_requests: 0,
2349 requests_per_second: 0.0,
2350 avg_response_time_ms: 0.0,
2351 error_rate: 0.0,
2352 cpu_usage: 0.0,
2353 memory_usage: 0,
2354 };
2355
2356 let metadata = MCPServiceMetadata {
2358 name: "test_service".to_string(),
2359 version: "1.0.0".to_string(),
2360 description: Some("Test service".to_string()),
2361 tags: vec!["test".to_string()],
2362 health_status: ServiceHealthStatus::Healthy,
2363 load_metrics: metrics.clone(),
2364 };
2365
2366 assert_eq!(metadata.health_status, ServiceHealthStatus::Healthy);
2367
2368 metrics.error_rate = 0.5; let degraded_metadata = MCPServiceMetadata {
2371 health_status: ServiceHealthStatus::Degraded,
2372 load_metrics: metrics.clone(),
2373 ..metadata.clone()
2374 };
2375
2376 assert_eq!(degraded_metadata.health_status, ServiceHealthStatus::Degraded);
2377
2378 let unhealthy_metadata = MCPServiceMetadata {
2379 health_status: ServiceHealthStatus::Unhealthy,
2380 ..metadata.clone()
2381 };
2382
2383 assert_eq!(unhealthy_metadata.health_status, ServiceHealthStatus::Unhealthy);
2384 }
2385
2386 #[tokio::test]
2387 async fn test_p2p_mcp_message() {
2388 let source_peer = "source_peer_123".to_string();
2389 let target_peer = "target_peer_456".to_string();
2390
2391 let p2p_message = P2PMCPMessage {
2392 message_type: P2PMCPMessageType::Request,
2393 message_id: uuid::Uuid::new_v4().to_string(),
2394 source_peer: source_peer.clone(),
2395 target_peer: Some(target_peer.clone()),
2396 timestamp: SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(),
2397 payload: MCPMessage::ListTools { cursor: None },
2398 ttl: 10,
2399 };
2400
2401 let serialized = serde_json::to_string(&p2p_message).unwrap();
2403 let deserialized: P2PMCPMessage = serde_json::from_str(&serialized).unwrap();
2404
2405 assert_eq!(deserialized.message_type, P2PMCPMessageType::Request);
2406 assert_eq!(deserialized.source_peer, source_peer);
2407 assert_eq!(deserialized.target_peer, Some(target_peer));
2408 assert_eq!(deserialized.ttl, 10);
2409
2410 match deserialized.payload {
2411 MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
2412 _ => panic!("Wrong message payload type"),
2413 }
2414 }
2415
2416 #[tokio::test]
2417 async fn test_tool_requirements_default() {
2418 let default_requirements = ToolRequirements::default();
2419
2420 assert_eq!(default_requirements.max_memory, Some(100 * 1024 * 1024));
2421 assert_eq!(default_requirements.max_execution_time, Some(Duration::from_secs(30)));
2422 assert!(default_requirements.required_capabilities.is_empty());
2423 assert!(!default_requirements.requires_network);
2424 assert!(!default_requirements.requires_filesystem);
2425 }
2426
2427 #[tokio::test]
2428 async fn test_mcp_server_stats() {
2429 let server = create_test_mcp_server().await;
2430
2431 let stats = server.stats.read().await;
2433 assert_eq!(stats.total_tools, 0);
2434 assert_eq!(stats.total_requests, 0);
2435 assert_eq!(stats.total_responses, 0);
2436 assert_eq!(stats.total_errors, 0);
2437
2438 drop(stats);
2439
2440 let tool = create_test_tool("stats_test_tool");
2442 server.register_tool(tool).await.unwrap();
2443
2444 let stats = server.stats.read().await;
2445 assert_eq!(stats.total_tools, 1);
2446 }
2447
2448 #[tokio::test]
2449 async fn test_log_levels() {
2450 let levels = vec![
2452 MCPLogLevel::Debug,
2453 MCPLogLevel::Info,
2454 MCPLogLevel::Notice,
2455 MCPLogLevel::Warning,
2456 MCPLogLevel::Error,
2457 MCPLogLevel::Critical,
2458 MCPLogLevel::Alert,
2459 MCPLogLevel::Emergency,
2460 ];
2461
2462 for level in levels {
2463 let serialized = serde_json::to_string(&level).unwrap();
2464 let deserialized: MCPLogLevel = serde_json::from_str(&serialized).unwrap();
2465 assert_eq!(level as u8, deserialized as u8);
2466 }
2467 }
2468
2469 #[tokio::test]
2470 async fn test_mcp_endpoint() {
2471 let endpoint = MCPEndpoint {
2472 protocol: "p2p".to_string(),
2473 address: "127.0.0.1".to_string(),
2474 port: Some(9000),
2475 tls: true,
2476 auth_required: true,
2477 };
2478
2479 let serialized = serde_json::to_string(&endpoint).unwrap();
2480 let deserialized: MCPEndpoint = serde_json::from_str(&serialized).unwrap();
2481
2482 assert_eq!(deserialized.protocol, "p2p");
2483 assert_eq!(deserialized.address, "127.0.0.1");
2484 assert_eq!(deserialized.port, Some(9000));
2485 assert!(deserialized.tls);
2486 assert!(deserialized.auth_required);
2487 }
2488
2489 #[tokio::test]
2490 async fn test_mcp_service_metadata() {
2491 let load_metrics = ServiceLoadMetrics {
2492 active_requests: 5,
2493 requests_per_second: 10.5,
2494 avg_response_time_ms: 250.0,
2495 error_rate: 0.01,
2496 cpu_usage: 45.5,
2497 memory_usage: 1024 * 1024 * 100, };
2499
2500 let metadata = MCPServiceMetadata {
2501 name: "test_service".to_string(),
2502 version: "2.1.0".to_string(),
2503 description: Some("A test service for unit testing".to_string()),
2504 tags: vec!["test".to_string(), "unit".to_string(), "mcp".to_string()],
2505 health_status: ServiceHealthStatus::Healthy,
2506 load_metrics,
2507 };
2508
2509 let serialized = serde_json::to_string(&metadata).unwrap();
2511 let deserialized: MCPServiceMetadata = serde_json::from_str(&serialized).unwrap();
2512
2513 assert_eq!(deserialized.name, "test_service");
2514 assert_eq!(deserialized.version, "2.1.0");
2515 assert_eq!(deserialized.description, Some("A test service for unit testing".to_string()));
2516 assert_eq!(deserialized.tags, vec!["test", "unit", "mcp"]);
2517 assert_eq!(deserialized.health_status, ServiceHealthStatus::Healthy);
2518 assert_eq!(deserialized.load_metrics.active_requests, 5);
2519 assert_eq!(deserialized.load_metrics.requests_per_second, 10.5);
2520 }
2521
2522 #[tokio::test]
2523 async fn test_function_tool_handler() {
2524 let handler = FunctionToolHandler::new(|args: Value| async move {
2526 let name = args.get("name").and_then(|v| v.as_str()).unwrap_or("world");
2527 Ok(json!({"greeting": format!("Hello, {}!", name)}))
2528 });
2529
2530 let args = json!({"name": "Alice"});
2531 let result = handler.execute(args).await.unwrap();
2532 assert_eq!(result["greeting"], "Hello, Alice!");
2533
2534 let empty_args = json!({});
2536 let result = handler.execute(empty_args).await.unwrap();
2537 assert_eq!(result["greeting"], "Hello, world!");
2538 }
2539
2540 #[tokio::test]
2541 async fn test_mcp_service_creation() {
2542 let service_id = "test_service_123".to_string();
2543 let node_id = "test_node_789".to_string();
2544
2545 let service = MCPService::new(service_id.clone(), node_id.clone());
2546
2547 assert_eq!(service.service_id, service_id);
2548 assert_eq!(service.node_id, node_id);
2549 assert!(service.tools.is_empty());
2550 assert_eq!(service.metadata.name, "MCP Service");
2551 assert_eq!(service.metadata.version, "1.0.0");
2552 assert_eq!(service.metadata.health_status, ServiceHealthStatus::Healthy);
2553 assert_eq!(service.endpoint.protocol, "p2p");
2554 assert!(!service.endpoint.tls);
2555 assert!(!service.endpoint.auth_required);
2556 }
2557
2558 #[tokio::test]
2559 async fn test_mcp_capabilities_default() {
2560 let capabilities = MCPCapabilities::default();
2561
2562 assert!(capabilities.tools.is_some());
2563 assert!(capabilities.prompts.is_some());
2564 assert!(capabilities.resources.is_some());
2565 assert!(capabilities.logging.is_some());
2566
2567 let tools_cap = capabilities.tools.unwrap();
2568 assert_eq!(tools_cap.list_changed, Some(true));
2569
2570 let resources_cap = capabilities.resources.unwrap();
2571 assert_eq!(resources_cap.subscribe, Some(true));
2572 assert_eq!(resources_cap.list_changed, Some(true));
2573
2574 let logging_cap = capabilities.logging.unwrap();
2575 let levels = logging_cap.levels.unwrap();
2576 assert!(levels.contains(&MCPLogLevel::Debug));
2577 assert!(levels.contains(&MCPLogLevel::Info));
2578 assert!(levels.contains(&MCPLogLevel::Warning));
2579 assert!(levels.contains(&MCPLogLevel::Error));
2580 }
2581
2582 #[tokio::test]
2583 async fn test_mcp_request_creation() {
2584 let source_peer = "source_peer_123".to_string();
2585 let target_peer = "target_peer_456".to_string();
2586
2587 let request = MCPRequest {
2588 request_id: uuid::Uuid::new_v4().to_string(),
2589 source_peer: source_peer.clone(),
2590 target_peer: target_peer.clone(),
2591 message: MCPMessage::ListTools { cursor: None },
2592 timestamp: SystemTime::now(),
2593 timeout: Duration::from_secs(30),
2594 auth_token: Some("test_token".to_string()),
2595 };
2596
2597 assert_eq!(request.source_peer, source_peer);
2598 assert_eq!(request.target_peer, target_peer);
2599 assert_eq!(request.timeout, Duration::from_secs(30));
2600 assert_eq!(request.auth_token, Some("test_token".to_string()));
2601
2602 match request.message {
2603 MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
2604 _ => panic!("Wrong message type"),
2605 }
2606 }
2607
2608 #[tokio::test]
2609 async fn test_p2p_message_types() {
2610 assert_eq!(P2PMCPMessageType::Request, P2PMCPMessageType::Request);
2612 assert_eq!(P2PMCPMessageType::Response, P2PMCPMessageType::Response);
2613 assert_eq!(P2PMCPMessageType::ServiceAdvertisement, P2PMCPMessageType::ServiceAdvertisement);
2614 assert_eq!(P2PMCPMessageType::ServiceDiscovery, P2PMCPMessageType::ServiceDiscovery);
2615
2616 for msg_type in [
2618 P2PMCPMessageType::Request,
2619 P2PMCPMessageType::Response,
2620 P2PMCPMessageType::ServiceAdvertisement,
2621 P2PMCPMessageType::ServiceDiscovery,
2622 ] {
2623 let serialized = serde_json::to_string(&msg_type).unwrap();
2624 let deserialized: P2PMCPMessageType = serde_json::from_str(&serialized).unwrap();
2625 assert_eq!(msg_type, deserialized);
2626 }
2627 }
2628}