1pub mod security;
13
14use crate::dht::{Key, DHT};
15use crate::{PeerId, Result, P2PError};
16use serde::{Deserialize, Serialize};
17use serde_json::{json, Value};
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime, Instant};
21use tokio::sync::{RwLock, mpsc, oneshot};
22use tokio::time::timeout;
23use tracing::{debug, info, warn};
24use rand;
25
26pub use security::*;
27
28pub const MCP_VERSION: &str = "2024-11-05";
30
31pub const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
33
34pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30);
36
37pub const MCP_PROTOCOL: &str = "/p2p-foundation/mcp/1.0.0";
39
40#[async_trait::async_trait]
42pub trait NetworkSender: Send + Sync {
43 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
45
46 fn local_peer_id(&self) -> &PeerId;
48}
49
50pub type MessageSender = Arc<dyn Fn(&PeerId, &str, Vec<u8>) -> Result<()> + Send + Sync>;
52
53pub const SERVICE_DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(tag = "type", rename_all = "snake_case")]
59pub enum MCPMessage {
60 Initialize {
62 protocol_version: String,
64 capabilities: MCPCapabilities,
66 client_info: MCPClientInfo,
68 },
69 InitializeResult {
71 protocol_version: String,
73 capabilities: MCPCapabilities,
75 server_info: MCPServerInfo,
77 },
78 ListTools {
80 cursor: Option<String>,
82 },
83 ListToolsResult {
85 tools: Vec<MCPTool>,
87 next_cursor: Option<String>,
89 },
90 CallTool {
92 name: String,
94 arguments: Value,
96 },
97 CallToolResult {
99 content: Vec<MCPContent>,
101 is_error: bool,
103 },
104 ListPrompts {
106 cursor: Option<String>,
108 },
109 ListPromptsResult {
111 prompts: Vec<MCPPrompt>,
113 next_cursor: Option<String>,
115 },
116 GetPrompt {
118 name: String,
120 arguments: Option<Value>,
122 },
123 GetPromptResult {
125 description: Option<String>,
127 messages: Vec<MCPPromptMessage>,
129 },
130 ListResources {
132 cursor: Option<String>,
134 },
135 ListResourcesResult {
137 resources: Vec<MCPResource>,
139 next_cursor: Option<String>,
141 },
142 ReadResource {
144 uri: String,
146 },
147 ReadResourceResult {
149 contents: Vec<MCPResourceContent>,
151 },
152 SubscribeResource {
154 uri: String,
156 },
157 UnsubscribeResource {
159 uri: String,
161 },
162 ResourceUpdated {
164 uri: String,
166 },
167 ListLogs {
169 cursor: Option<String>,
171 },
172 ListLogsResult {
174 logs: Vec<MCPLogEntry>,
176 next_cursor: Option<String>,
178 },
179 SetLogLevel {
181 level: MCPLogLevel,
183 },
184 Error {
186 code: i32,
188 message: String,
190 data: Option<Value>,
192 },
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct MCPCapabilities {
198 pub experimental: Option<Value>,
200 pub sampling: Option<Value>,
202 pub tools: Option<MCPToolsCapability>,
204 pub prompts: Option<MCPPromptsCapability>,
206 pub resources: Option<MCPResourcesCapability>,
208 pub logging: Option<MCPLoggingCapability>,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct MCPToolsCapability {
215 pub list_changed: Option<bool>,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct MCPPromptsCapability {
222 pub list_changed: Option<bool>,
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct MCPResourcesCapability {
229 pub subscribe: Option<bool>,
231 pub list_changed: Option<bool>,
233}
234
235#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct MCPLoggingCapability {
238 pub levels: Option<Vec<MCPLogLevel>>,
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct MCPClientInfo {
245 pub name: String,
247 pub version: String,
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct MCPServerInfo {
254 pub name: String,
256 pub version: String,
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
262pub struct MCPTool {
263 pub name: String,
265 pub description: String,
267 pub input_schema: Value,
269}
270
271pub struct Tool {
273 pub definition: MCPTool,
275 pub handler: Box<dyn ToolHandler + Send + Sync>,
277 pub metadata: ToolMetadata,
279}
280
281#[derive(Debug, Clone)]
283pub struct ToolMetadata {
284 pub created_at: SystemTime,
286 pub last_called: Option<SystemTime>,
288 pub call_count: u64,
290 pub avg_execution_time: Duration,
292 pub health_status: ToolHealthStatus,
294 pub tags: Vec<String>,
296}
297
298#[derive(Debug, Clone, Copy, PartialEq)]
300pub enum ToolHealthStatus {
301 Healthy,
303 Degraded,
305 Unhealthy,
307 Disabled,
309}
310
311
312#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct HealthMonitorConfig {
315 pub health_check_interval: Duration,
317 pub health_check_timeout: Duration,
319 pub failure_threshold: u32,
321 pub success_threshold: u32,
323 pub heartbeat_interval: Duration,
325 pub heartbeat_timeout: Duration,
327 pub enabled: bool,
329}
330
331impl Default for HealthMonitorConfig {
332 fn default() -> Self {
333 Self {
334 health_check_interval: Duration::from_secs(30),
335 health_check_timeout: Duration::from_secs(5),
336 failure_threshold: 3,
337 success_threshold: 2,
338 heartbeat_interval: Duration::from_secs(60),
339 heartbeat_timeout: Duration::from_secs(300), enabled: true,
341 }
342 }
343}
344
345#[derive(Debug, Clone, Serialize, Deserialize)]
347pub struct ServiceHealth {
348 pub service_id: String,
350 pub status: ServiceHealthStatus,
352 pub last_health_check: Option<SystemTime>,
354 pub last_heartbeat: Option<SystemTime>,
356 pub failure_count: u32,
358 pub success_count: u32,
360 pub avg_response_time: Duration,
362 pub error_message: Option<String>,
364 pub health_history: Vec<HealthCheckResult>,
366}
367
368#[derive(Debug, Clone, Serialize, Deserialize)]
370pub struct HealthCheckResult {
371 pub timestamp: SystemTime,
373 pub success: bool,
375 pub response_time: Duration,
377 pub error_message: Option<String>,
379}
380
381#[derive(Debug, Clone, Serialize, Deserialize)]
383pub struct Heartbeat {
384 pub service_id: String,
386 pub peer_id: PeerId,
388 pub timestamp: SystemTime,
390 pub load: f32,
392 pub available_tools: Vec<String>,
394 pub capabilities: MCPCapabilities,
396}
397
398#[derive(Debug, Clone)]
400pub enum HealthEvent {
401 ServiceHealthy {
403 service_id: String,
404 peer_id: PeerId,
405 },
406 ServiceUnhealthy {
408 service_id: String,
409 peer_id: PeerId,
410 error: String,
411 },
412 ServiceDegraded {
414 service_id: String,
415 peer_id: PeerId,
416 reason: String,
417 },
418 HeartbeatReceived {
420 service_id: String,
421 peer_id: PeerId,
422 load: f32,
423 },
424 HeartbeatTimeout {
426 service_id: String,
427 peer_id: PeerId,
428 },
429}
430
431pub trait ToolHandler {
433 fn execute(&self, arguments: Value) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>>;
435
436 fn validate(&self, arguments: &Value) -> Result<()> {
438 let _ = arguments;
440 Ok(())
441 }
442
443 fn get_requirements(&self) -> ToolRequirements {
445 ToolRequirements::default()
446 }
447}
448
449#[derive(Debug, Clone)]
451pub struct ToolRequirements {
452 pub max_memory: Option<u64>,
454 pub max_execution_time: Option<Duration>,
456 pub required_capabilities: Vec<String>,
458 pub requires_network: bool,
460 pub requires_filesystem: bool,
462}
463
464impl Default for ToolRequirements {
465 fn default() -> Self {
466 Self {
467 max_memory: Some(100 * 1024 * 1024), max_execution_time: Some(Duration::from_secs(30)),
469 required_capabilities: Vec::new(),
470 requires_network: false,
471 requires_filesystem: false,
472 }
473 }
474}
475
476#[derive(Debug, Clone, Serialize, Deserialize)]
478#[serde(tag = "type", rename_all = "snake_case")]
479pub enum MCPContent {
480 Text {
482 text: String,
484 },
485 Image {
487 data: String,
489 mime_type: String,
491 },
492 Resource {
494 resource: MCPResourceReference,
496 },
497}
498
499#[derive(Debug, Clone, Serialize, Deserialize)]
501pub struct MCPResourceReference {
502 pub uri: String,
504 pub type_: Option<String>,
506}
507
508#[derive(Debug, Clone, Serialize, Deserialize)]
510pub struct MCPPrompt {
511 pub name: String,
513 pub description: Option<String>,
515 pub arguments: Option<Value>,
517}
518
519#[derive(Debug, Clone, Serialize, Deserialize)]
521pub struct MCPPromptMessage {
522 pub role: MCPRole,
524 pub content: MCPContent,
526}
527
528#[derive(Debug, Clone, Serialize, Deserialize)]
530#[serde(rename_all = "snake_case")]
531pub enum MCPRole {
532 User,
534 Assistant,
536 System,
538}
539
540#[derive(Debug, Clone, Serialize, Deserialize)]
542pub struct MCPResource {
543 pub uri: String,
545 pub name: String,
547 pub description: Option<String>,
549 pub mime_type: Option<String>,
551}
552
553#[derive(Debug, Clone, Serialize, Deserialize)]
555pub struct MCPResourceContent {
556 pub uri: String,
558 pub mime_type: String,
560 pub text: Option<String>,
562 pub blob: Option<String>,
564}
565
566#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
568#[serde(rename_all = "snake_case")]
569pub enum MCPLogLevel {
570 Debug,
572 Info,
574 Notice,
576 Warning,
578 Error,
580 Critical,
582 Alert,
584 Emergency,
586}
587
588#[derive(Debug, Clone, Serialize, Deserialize)]
590pub struct MCPLogEntry {
591 pub level: MCPLogLevel,
593 pub data: Value,
595 pub logger: Option<String>,
597}
598
599#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct MCPService {
602 pub service_id: String,
604 pub node_id: PeerId,
606 pub tools: Vec<String>,
608 pub capabilities: MCPCapabilities,
610 pub metadata: MCPServiceMetadata,
612 pub registered_at: SystemTime,
614 pub endpoint: MCPEndpoint,
616}
617
618#[derive(Debug, Clone, Serialize, Deserialize)]
620pub struct MCPServiceMetadata {
621 pub name: String,
623 pub version: String,
625 pub description: Option<String>,
627 pub tags: Vec<String>,
629 pub health_status: ServiceHealthStatus,
631 pub load_metrics: ServiceLoadMetrics,
633}
634
635#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
637pub enum ServiceHealthStatus {
638 Healthy,
640 Degraded,
642 Unhealthy,
644 Disabled,
646 Unknown,
648}
649
650#[derive(Debug, Clone, Serialize, Deserialize)]
652pub struct ServiceLoadMetrics {
653 pub active_requests: u32,
655 pub requests_per_second: f64,
657 pub avg_response_time_ms: f64,
659 pub error_rate: f64,
661 pub cpu_usage: f64,
663 pub memory_usage: u64,
665}
666
667#[derive(Debug, Clone, Serialize, Deserialize)]
669pub struct MCPEndpoint {
670 pub protocol: String,
672 pub address: String,
674 pub port: Option<u16>,
676 pub tls: bool,
678 pub auth_required: bool,
680}
681
682#[derive(Debug, Clone)]
684pub struct MCPRequest {
685 pub request_id: String,
687 pub source_peer: PeerId,
689 pub target_peer: PeerId,
691 pub message: MCPMessage,
693 pub timestamp: SystemTime,
695 pub timeout: Duration,
697 pub auth_token: Option<String>,
699}
700
701#[derive(Debug, Clone, Serialize, Deserialize)]
703pub struct P2PMCPMessage {
704 pub message_type: P2PMCPMessageType,
706 pub message_id: String,
708 pub source_peer: PeerId,
710 pub target_peer: Option<PeerId>,
712 pub timestamp: u64,
714 pub payload: MCPMessage,
716 pub ttl: u8,
718}
719
720#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
722pub enum P2PMCPMessageType {
723 Request,
725 Response,
727 ServiceAdvertisement,
729 ServiceDiscovery,
731 Heartbeat,
733 HealthCheck,
735}
736
737#[derive(Debug, Clone)]
739pub struct MCPResponse {
740 pub request_id: String,
742 pub message: MCPMessage,
744 pub timestamp: SystemTime,
746 pub processing_time: Duration,
748}
749
750#[derive(Debug, Clone)]
752pub struct MCPCallContext {
753 pub caller_id: PeerId,
755 pub timestamp: SystemTime,
757 pub timeout: Duration,
759 pub auth_info: Option<MCPAuthInfo>,
761 pub metadata: HashMap<String, String>,
763}
764
765#[derive(Debug, Clone)]
767pub struct MCPAuthInfo {
768 pub token: String,
770 pub token_type: String,
772 pub expires_at: Option<SystemTime>,
774 pub permissions: Vec<String>,
776}
777
778#[derive(Debug, Clone, Serialize, Deserialize)]
780pub struct MCPServerConfig {
781 pub server_name: String,
783 pub server_version: String,
785 pub enable_dht_discovery: bool,
787 pub max_concurrent_requests: usize,
789 pub request_timeout: Duration,
791 pub enable_auth: bool,
793 pub enable_rate_limiting: bool,
795 pub rate_limit_rpm: u32,
797 pub enable_logging: bool,
799 pub max_tool_execution_time: Duration,
801 pub tool_memory_limit: u64,
803 pub health_monitor: HealthMonitorConfig,
805}
806
807impl Default for MCPServerConfig {
808 fn default() -> Self {
809 Self {
810 server_name: "P2P-MCP-Server".to_string(),
811 server_version: crate::VERSION.to_string(),
812 enable_dht_discovery: true,
813 max_concurrent_requests: 100,
814 request_timeout: DEFAULT_CALL_TIMEOUT,
815 enable_auth: true,
816 enable_rate_limiting: true,
817 rate_limit_rpm: 60,
818 enable_logging: true,
819 max_tool_execution_time: Duration::from_secs(30),
820 tool_memory_limit: 100 * 1024 * 1024, health_monitor: HealthMonitorConfig::default(),
822 }
823 }
824}
825
826pub struct MCPServer {
828 config: MCPServerConfig,
830 tools: Arc<RwLock<HashMap<String, Tool>>>,
832 #[allow(dead_code)]
834 prompts: Arc<RwLock<HashMap<String, MCPPrompt>>>,
835 #[allow(dead_code)]
837 resources: Arc<RwLock<HashMap<String, MCPResource>>>,
838 sessions: Arc<RwLock<HashMap<String, MCPSession>>>,
840 request_handlers: Arc<RwLock<HashMap<String, oneshot::Sender<MCPResponse>>>>,
842 dht: Option<Arc<RwLock<DHT>>>,
844 local_services: Arc<RwLock<HashMap<String, MCPService>>>,
846 remote_services: Arc<RwLock<HashMap<String, MCPService>>>,
848 stats: Arc<RwLock<MCPServerStats>>,
850 request_tx: mpsc::UnboundedSender<MCPRequest>,
852 #[allow(dead_code)]
854 response_rx: Arc<RwLock<mpsc::UnboundedReceiver<MCPResponse>>>,
855 security_manager: Option<Arc<MCPSecurityManager>>,
857 audit_logger: Arc<SecurityAuditLogger>,
859 network_sender: Arc<RwLock<Option<Arc<dyn NetworkSender>>>>,
861 service_health: Arc<RwLock<HashMap<String, ServiceHealth>>>,
863 health_event_tx: mpsc::UnboundedSender<HealthEvent>,
865 #[allow(dead_code)]
867 health_event_rx: Arc<RwLock<mpsc::UnboundedReceiver<HealthEvent>>>,
868}
869
870#[derive(Debug, Clone)]
872pub struct MCPSession {
873 pub session_id: String,
875 pub peer_id: PeerId,
877 pub client_capabilities: Option<MCPCapabilities>,
879 pub started_at: SystemTime,
881 pub last_activity: SystemTime,
883 pub state: MCPSessionState,
885 pub subscribed_resources: Vec<String>,
887}
888
889#[derive(Debug, Clone, PartialEq)]
891pub enum MCPSessionState {
892 Initializing,
894 Active,
896 Inactive,
898 Terminated,
900}
901
902#[derive(Debug, Clone)]
904pub struct MCPServerStats {
905 pub total_requests: u64,
907 pub total_responses: u64,
909 pub total_errors: u64,
911 pub avg_response_time: Duration,
913 pub active_sessions: u32,
915 pub total_tools: u32,
917 pub popular_tools: HashMap<String, u64>,
919 pub server_started_at: SystemTime,
921}
922
923impl Default for MCPServerStats {
924 fn default() -> Self {
925 Self {
926 total_requests: 0,
927 total_responses: 0,
928 total_errors: 0,
929 avg_response_time: Duration::from_millis(0),
930 active_sessions: 0,
931 total_tools: 0,
932 popular_tools: HashMap::new(),
933 server_started_at: SystemTime::now(),
934 }
935 }
936}
937
938impl MCPServer {
939 pub fn new(config: MCPServerConfig) -> Self {
941 let (request_tx, _request_rx) = mpsc::unbounded_channel();
942 let (_response_tx, response_rx) = mpsc::unbounded_channel();
943 let (health_event_tx, health_event_rx) = mpsc::unbounded_channel();
944
945 let security_manager = if config.enable_auth {
947 let secret_key = (0..32).map(|_| rand::random::<u8>()).collect();
949 Some(Arc::new(MCPSecurityManager::new(secret_key, config.rate_limit_rpm)))
950 } else {
951 None
952 };
953
954 let server = Self {
955 config,
956 tools: Arc::new(RwLock::new(HashMap::new())),
957 prompts: Arc::new(RwLock::new(HashMap::new())),
958 resources: Arc::new(RwLock::new(HashMap::new())),
959 sessions: Arc::new(RwLock::new(HashMap::new())),
960 request_handlers: Arc::new(RwLock::new(HashMap::new())),
961 dht: None,
962 local_services: Arc::new(RwLock::new(HashMap::new())),
963 remote_services: Arc::new(RwLock::new(HashMap::new())),
964 stats: Arc::new(RwLock::new(MCPServerStats::default())),
965 request_tx,
966 response_rx: Arc::new(RwLock::new(response_rx)),
967 security_manager,
968 audit_logger: Arc::new(SecurityAuditLogger::new(10000)), network_sender: Arc::new(RwLock::new(None)),
970 service_health: Arc::new(RwLock::new(HashMap::new())),
971 health_event_tx,
972 health_event_rx: Arc::new(RwLock::new(health_event_rx)),
973 };
974
975 server
976 }
977
978 pub fn with_dht(mut self, dht: Arc<RwLock<DHT>>) -> Self {
980 self.dht = Some(dht);
981 self
982 }
983
984 pub async fn with_network_sender(self, sender: Arc<dyn NetworkSender>) -> Self {
986 *self.network_sender.write().await = Some(sender);
987 self
988 }
989
990 pub async fn set_network_sender(&self, sender: Arc<dyn NetworkSender>) {
992 let peer_id = sender.local_peer_id().clone();
993 *self.network_sender.write().await = Some(sender);
994 info!("MCP server network sender configured for peer {}", peer_id);
995 }
996
997 pub async fn start(&self) -> Result<()> {
999 info!("Starting MCP server: {}", self.config.server_name);
1000
1001 self.start_request_processor().await?;
1003
1004 if self.dht.is_some() {
1006 self.start_service_discovery().await?;
1007 }
1008
1009 self.start_health_monitor().await?;
1011
1012 info!("MCP server started successfully");
1013 Ok(())
1014 }
1015
1016 pub async fn register_tool(&self, tool: Tool) -> Result<()> {
1018 let tool_name = tool.definition.name.clone();
1019
1020 self.validate_tool(&tool).await?;
1022
1023 {
1025 let mut tools = self.tools.write().await;
1026 tools.insert(tool_name.clone(), tool);
1027 }
1028
1029 {
1031 let mut stats = self.stats.write().await;
1032 stats.total_tools += 1;
1033 }
1034
1035 if let Some(dht) = &self.dht {
1037 self.register_tool_in_dht(&tool_name, dht).await?;
1038 }
1039
1040 if let Err(e) = self.announce_local_services().await {
1042 warn!("Failed to announce service after tool registration: {}", e);
1043 }
1044
1045 info!("Registered tool: {}", tool_name);
1046 Ok(())
1047 }
1048
1049 async fn validate_tool(&self, tool: &Tool) -> Result<()> {
1051 let tools = self.tools.read().await;
1053 if tools.contains_key(&tool.definition.name) {
1054 return Err(P2PError::MCP(format!("Tool already exists: {}", tool.definition.name)).into());
1055 }
1056
1057 if tool.definition.name.is_empty() || tool.definition.name.len() > 100 {
1059 return Err(P2PError::MCP("Invalid tool name".to_string()).into());
1060 }
1061
1062 if !tool.definition.input_schema.is_object() {
1064 return Err(P2PError::MCP("Tool input schema must be an object".to_string()).into());
1065 }
1066
1067 Ok(())
1068 }
1069
1070 async fn register_tool_in_dht(&self, tool_name: &str, dht: &Arc<RwLock<DHT>>) -> Result<()> {
1072 let key = Key::new(format!("mcp:tool:{}", tool_name).as_bytes());
1073 let service_info = json!({
1074 "tool_name": tool_name,
1075 "node_id": "local_node", "registered_at": SystemTime::now().duration_since(std::time::UNIX_EPOCH).map_err(|e| P2PError::Network(format!("Time error: {}", e)))?.as_secs(),
1077 "capabilities": self.get_server_capabilities().await
1078 });
1079
1080 let dht_guard = dht.read().await;
1081 dht_guard.put(key, serde_json::to_vec(&service_info)?).await?;
1082
1083 Ok(())
1084 }
1085
1086 async fn get_server_capabilities(&self) -> MCPCapabilities {
1088 MCPCapabilities {
1089 experimental: None,
1090 sampling: None,
1091 tools: Some(MCPToolsCapability {
1092 list_changed: Some(true),
1093 }),
1094 prompts: Some(MCPPromptsCapability {
1095 list_changed: Some(true),
1096 }),
1097 resources: Some(MCPResourcesCapability {
1098 subscribe: Some(true),
1099 list_changed: Some(true),
1100 }),
1101 logging: Some(MCPLoggingCapability {
1102 levels: Some(vec![
1103 MCPLogLevel::Debug,
1104 MCPLogLevel::Info,
1105 MCPLogLevel::Warning,
1106 MCPLogLevel::Error,
1107 ]),
1108 }),
1109 }
1110 }
1111
1112 pub async fn call_tool(&self, tool_name: &str, arguments: Value, context: MCPCallContext) -> Result<Value> {
1114 let start_time = Instant::now();
1115
1116 if !self.check_rate_limit(&context.caller_id).await? {
1120 return Err(P2PError::MCP("Rate limit exceeded".to_string()));
1121 }
1122
1123 if !self.check_permission(&context.caller_id, &MCPPermission::ExecuteTools).await? {
1125 return Err(P2PError::MCP("Permission denied: execute tools".to_string()));
1126 }
1127
1128 let tool_security_level = self.get_tool_security_policy(tool_name).await;
1130 let is_trusted = self.is_trusted_peer(&context.caller_id).await;
1131
1132 match tool_security_level {
1133 SecurityLevel::Admin => {
1134 if !self.check_permission(&context.caller_id, &MCPPermission::Admin).await? {
1135 return Err(P2PError::MCP("Permission denied: admin access required".to_string()));
1136 }
1137 }
1138 SecurityLevel::Strong => {
1139 if !is_trusted {
1140 return Err(P2PError::MCP("Permission denied: trusted peer required".to_string()));
1141 }
1142 }
1143 SecurityLevel::Basic => {
1144 if self.config.enable_auth {
1146 if let Some(auth_info) = &context.auth_info {
1147 self.verify_auth_token(&auth_info.token).await?;
1148 } else {
1149 return Err(P2PError::MCP("Authentication required".to_string()));
1150 }
1151 }
1152 }
1153 SecurityLevel::Public => {
1154 }
1156 }
1157
1158 let mut details = HashMap::new();
1160 details.insert("action".to_string(), "tool_call".to_string());
1161 details.insert("tool_name".to_string(), tool_name.to_string());
1162 details.insert("security_level".to_string(), format!("{:?}", tool_security_level));
1163
1164 self.audit_logger.log_event(
1165 "tool_execution".to_string(),
1166 context.caller_id.clone(),
1167 details,
1168 AuditSeverity::Info,
1169 ).await;
1170
1171 let tool_exists = {
1173 let tools = self.tools.read().await;
1174 tools.contains_key(tool_name)
1175 };
1176
1177 if !tool_exists {
1178 return Err(P2PError::MCP(format!("Tool not found: {}", tool_name)).into());
1179 }
1180
1181 let requirements = {
1183 let tools = self.tools.read().await;
1184 let tool = tools.get(tool_name).unwrap(); if let Err(e) = tool.handler.validate(&arguments) {
1188 return Err(P2PError::MCP(format!("Tool validation failed: {}", e)).into());
1189 }
1190
1191 tool.handler.get_requirements()
1193 };
1194
1195 self.check_resource_requirements(&requirements).await?;
1197
1198 let tools_clone = self.tools.clone();
1200 let tool_name_owned = tool_name.to_string();
1201 let execution_timeout = context.timeout.min(requirements.max_execution_time.unwrap_or(context.timeout));
1202
1203 let result = timeout(execution_timeout, async move {
1204 let tools = tools_clone.read().await;
1205 let tool = tools.get(&tool_name_owned).unwrap(); tool.handler.execute(arguments).await
1207 }).await
1208 .map_err(|_| P2PError::MCP("Tool execution timeout".to_string()))?
1209 .map_err(|e| P2PError::MCP(format!("Tool execution failed: {}", e)))?;
1210
1211 let execution_time = start_time.elapsed();
1212
1213 self.update_tool_stats(tool_name, execution_time, true).await;
1215
1216 {
1218 let mut stats = self.stats.write().await;
1219 stats.total_requests += 1;
1220 stats.total_responses += 1;
1221
1222 let new_total_time = stats.avg_response_time.mul_f64(stats.total_responses as f64 - 1.0) + execution_time;
1224 stats.avg_response_time = new_total_time.div_f64(stats.total_responses as f64);
1225
1226 *stats.popular_tools.entry(tool_name.to_string()).or_insert(0) += 1;
1228 }
1229
1230 debug!("Tool '{}' executed in {:?}", tool_name, execution_time);
1231 Ok(result)
1232 }
1233
1234 async fn check_resource_requirements(&self, requirements: &ToolRequirements) -> Result<()> {
1236 if let Some(max_memory) = requirements.max_memory {
1238 if max_memory > self.config.tool_memory_limit {
1239 return Err(P2PError::MCP("Tool memory requirement exceeds limit".to_string()).into());
1240 }
1241 }
1242
1243 if let Some(max_execution_time) = requirements.max_execution_time {
1245 if max_execution_time > self.config.max_tool_execution_time {
1246 return Err(P2PError::MCP("Tool execution time requirement exceeds limit".to_string()).into());
1247 }
1248 }
1249
1250 Ok(())
1253 }
1254
1255 async fn update_tool_stats(&self, tool_name: &str, execution_time: Duration, success: bool) {
1257 let mut tools = self.tools.write().await;
1258 if let Some(tool) = tools.get_mut(tool_name) {
1259 tool.metadata.call_count += 1;
1260 tool.metadata.last_called = Some(SystemTime::now());
1261
1262 let new_total_time = tool.metadata.avg_execution_time.mul_f64(tool.metadata.call_count as f64 - 1.0) + execution_time;
1264 tool.metadata.avg_execution_time = new_total_time.div_f64(tool.metadata.call_count as f64);
1265
1266 if !success {
1268 tool.metadata.health_status = match tool.metadata.health_status {
1269 ToolHealthStatus::Healthy => ToolHealthStatus::Degraded,
1270 ToolHealthStatus::Degraded => ToolHealthStatus::Unhealthy,
1271 other => other,
1272 };
1273 } else if tool.metadata.health_status != ToolHealthStatus::Disabled {
1274 tool.metadata.health_status = ToolHealthStatus::Healthy;
1275 }
1276 }
1277 }
1278
1279 pub async fn list_tools(&self, _cursor: Option<String>) -> Result<(Vec<MCPTool>, Option<String>)> {
1281 let tools = self.tools.read().await;
1282 let tool_definitions: Vec<MCPTool> = tools.values()
1283 .map(|tool| tool.definition.clone())
1284 .collect();
1285
1286 Ok((tool_definitions, None))
1289 }
1290
1291 async fn start_request_processor(&self) -> Result<()> {
1293 let _request_tx = self.request_tx.clone();
1294 let _server_clone = Arc::new(self);
1295
1296 tokio::spawn(async move {
1297 info!("MCP request processor started");
1298
1299 loop {
1304 tokio::time::sleep(Duration::from_millis(100)).await;
1307
1308 break;
1311 }
1312
1313 info!("MCP request processor stopped");
1314 });
1315
1316 Ok(())
1317 }
1318
1319 async fn start_service_discovery(&self) -> Result<()> {
1321 if let Some(dht) = self.dht.clone() {
1322 let _stats = self.stats.clone();
1323 let remote_services = self.remote_services.clone();
1324
1325 tokio::spawn(async move {
1326 info!("MCP service discovery started");
1327
1328 loop {
1329 tokio::time::sleep(SERVICE_DISCOVERY_INTERVAL).await;
1331
1332 let key = Key::new(b"mcp:services");
1334 let dht_guard = dht.read().await;
1335
1336 match dht_guard.get(&key).await {
1337 Some(record) => {
1338 match serde_json::from_slice::<Vec<MCPService>>(&record.value) {
1339 Ok(services) => {
1340 debug!("Discovered {} MCP services", services.len());
1341
1342 {
1344 let mut remote_cache = remote_services.write().await;
1345 for service in services {
1346 remote_cache.insert(service.service_id.clone(), service);
1347 }
1348 }
1349 }
1350 Err(e) => {
1351 debug!("Failed to deserialize services: {}", e);
1352 }
1353 }
1354 }
1355 None => {
1356 debug!("No MCP services found in DHT");
1357 }
1358 }
1359 }
1360 });
1361 }
1362
1363 Ok(())
1364 }
1365
1366 async fn start_health_monitor(&self) -> Result<()> {
1368 if !self.config.health_monitor.enabled {
1369 debug!("Health monitoring is disabled");
1370 return Ok(());
1371 }
1372
1373 info!("Starting health monitoring with interval: {:?}", self.config.health_monitor.health_check_interval);
1374
1375 let service_health = Arc::clone(&self.service_health);
1377 let remote_services = Arc::clone(&self.remote_services);
1378 let network_sender = Arc::clone(&self.network_sender);
1379 let health_event_tx = self.health_event_tx.clone();
1380 let config = self.config.health_monitor.clone();
1381
1382 let health_check_task = {
1384 let service_health = Arc::clone(&service_health);
1385 let remote_services = Arc::clone(&remote_services);
1386 let network_sender = Arc::clone(&network_sender);
1387 let health_event_tx = health_event_tx.clone();
1388 let config = config.clone();
1389
1390 tokio::spawn(async move {
1391 let mut interval = tokio::time::interval(config.health_check_interval);
1392
1393 loop {
1394 interval.tick().await;
1395
1396 let services_to_check: Vec<MCPService> = {
1398 let remote_guard = remote_services.read().await;
1399 remote_guard.values().cloned().collect()
1400 };
1401
1402 for service in services_to_check {
1404 if let Some(sender) = network_sender.read().await.as_ref() {
1405 Self::perform_health_check(
1406 &service,
1407 sender.as_ref(),
1408 &service_health,
1409 &health_event_tx,
1410 &config,
1411 ).await;
1412 }
1413 }
1414 }
1415 })
1416 };
1417
1418 let heartbeat_task = {
1420 let network_sender = Arc::clone(&network_sender);
1421 let health_event_tx = health_event_tx.clone();
1422 let config = config.clone();
1423
1424 tokio::spawn(async move {
1425 let mut interval = tokio::time::interval(config.heartbeat_interval);
1426
1427 loop {
1428 interval.tick().await;
1429
1430 if let Some(sender) = network_sender.read().await.as_ref() {
1431 Self::send_heartbeat(
1432 sender.as_ref(),
1433 &health_event_tx,
1434 ).await;
1435 }
1436 }
1437 })
1438 };
1439
1440 let timeout_task = {
1442 let service_health = Arc::clone(&service_health);
1443 let health_event_tx = health_event_tx.clone();
1444 let config = config.clone();
1445
1446 tokio::spawn(async move {
1447 let mut interval = tokio::time::interval(Duration::from_secs(30)); loop {
1450 interval.tick().await;
1451
1452 Self::check_heartbeat_timeouts(
1453 &service_health,
1454 &health_event_tx,
1455 &config,
1456 ).await;
1457 }
1458 })
1459 };
1460
1461 tokio::spawn(async move {
1463 tokio::select! {
1464 _ = health_check_task => debug!("Health check task completed"),
1465 _ = heartbeat_task => debug!("Heartbeat task completed"),
1466 _ = timeout_task => debug!("Timeout monitoring task completed"),
1467 }
1468 });
1469
1470 info!("Health monitoring started successfully");
1471 Ok(())
1472 }
1473
1474 async fn perform_health_check(
1476 service: &MCPService,
1477 network_sender: &dyn NetworkSender,
1478 service_health: &Arc<RwLock<HashMap<String, ServiceHealth>>>,
1479 health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1480 config: &HealthMonitorConfig,
1481 ) {
1482 let start_time = Instant::now();
1483 let service_id = service.service_id.clone();
1484 let peer_id = service.node_id.clone();
1485
1486 let health_check_message = MCPMessage::CallTool {
1489 name: "health_check".to_string(),
1490 arguments: json!({
1491 "service_id": service_id,
1492 "timestamp": SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()
1493 }),
1494 };
1495
1496 let result = match serde_json::to_vec(&health_check_message) {
1498 Ok(data) => {
1499 timeout(
1500 config.health_check_timeout,
1501 network_sender.send_message(&peer_id, MCP_PROTOCOL, data)
1502 ).await
1503 }
1504 Err(e) => {
1505 debug!("Failed to serialize health check message: {}", e);
1506 return;
1507 }
1508 };
1509
1510 let response_time = start_time.elapsed();
1511 let success = result.is_ok() && result.unwrap().is_ok();
1512
1513 let mut health_guard = service_health.write().await;
1515 let health = health_guard.entry(service_id.clone()).or_insert_with(|| ServiceHealth {
1516 service_id: service_id.clone(),
1517 status: ServiceHealthStatus::Unknown,
1518 last_health_check: None,
1519 last_heartbeat: None,
1520 failure_count: 0,
1521 success_count: 0,
1522 avg_response_time: Duration::from_millis(0),
1523 error_message: None,
1524 health_history: Vec::new(),
1525 });
1526
1527 let check_result = HealthCheckResult {
1529 timestamp: SystemTime::now(),
1530 success,
1531 response_time,
1532 error_message: if success { None } else { Some("Health check failed".to_string()) },
1533 };
1534
1535 health.health_history.push(check_result);
1536 if health.health_history.len() > 10 {
1537 health.health_history.remove(0);
1538 }
1539
1540 let previous_status = health.status;
1542 if success {
1543 health.failure_count = 0;
1544 health.success_count += 1;
1545 health.last_health_check = Some(SystemTime::now());
1546
1547 if health.success_count >= config.success_threshold {
1548 health.status = ServiceHealthStatus::Healthy;
1549 health.error_message = None;
1550 }
1551 } else {
1552 health.success_count = 0;
1553 health.failure_count += 1;
1554
1555 if health.failure_count >= config.failure_threshold {
1556 health.status = ServiceHealthStatus::Unhealthy;
1557 health.error_message = Some("Health check failures exceeded threshold".to_string());
1558 }
1559 }
1560
1561 let total_time: Duration = health.health_history.iter().map(|h| h.response_time).sum();
1563 health.avg_response_time = total_time / health.health_history.len() as u32;
1564
1565 if previous_status != health.status {
1567 let event = match health.status {
1568 ServiceHealthStatus::Healthy => HealthEvent::ServiceHealthy {
1569 service_id: service_id.clone(),
1570 peer_id: peer_id.clone(),
1571 },
1572 ServiceHealthStatus::Unhealthy => HealthEvent::ServiceUnhealthy {
1573 service_id: service_id.clone(),
1574 peer_id: peer_id.clone(),
1575 error: health.error_message.clone().unwrap_or_else(|| "Unknown error".to_string()),
1576 },
1577 ServiceHealthStatus::Degraded => HealthEvent::ServiceDegraded {
1578 service_id: service_id.clone(),
1579 peer_id: peer_id.clone(),
1580 reason: "Performance degradation detected".to_string(),
1581 },
1582 _ => return, };
1584
1585 if let Err(e) = health_event_tx.send(event) {
1586 debug!("Failed to send health event: {}", e);
1587 }
1588 }
1589 }
1590
1591 async fn send_heartbeat(
1593 network_sender: &dyn NetworkSender,
1594 health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1595 ) {
1596 let heartbeat = Heartbeat {
1597 service_id: "mcp-server".to_string(),
1598 peer_id: network_sender.local_peer_id().clone(),
1599 timestamp: SystemTime::now(),
1600 load: 0.1, available_tools: vec![], capabilities: MCPCapabilities {
1603 experimental: None,
1604 sampling: None,
1605 tools: Some(MCPToolsCapability { list_changed: Some(true) }),
1606 prompts: None,
1607 resources: None,
1608 logging: None,
1609 },
1610 };
1611
1612 let heartbeat_message = MCPMessage::CallTool {
1614 name: "heartbeat".to_string(),
1615 arguments: serde_json::to_value(&heartbeat).unwrap_or(json!({})),
1616 };
1617
1618 if let Ok(data) = serde_json::to_vec(&heartbeat_message) {
1619 debug!("Sending heartbeat for service: {}", heartbeat.service_id);
1622
1623 let event = HealthEvent::HeartbeatReceived {
1625 service_id: heartbeat.service_id.clone(),
1626 peer_id: heartbeat.peer_id.clone(),
1627 load: heartbeat.load,
1628 };
1629
1630 if let Err(e) = health_event_tx.send(event) {
1631 debug!("Failed to send heartbeat event: {}", e);
1632 }
1633 }
1634 }
1635
1636 async fn check_heartbeat_timeouts(
1638 service_health: &Arc<RwLock<HashMap<String, ServiceHealth>>>,
1639 health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1640 config: &HealthMonitorConfig,
1641 ) {
1642 let now = SystemTime::now();
1643 let mut health_guard = service_health.write().await;
1644
1645 for (service_id, health) in health_guard.iter_mut() {
1646 if let Some(last_heartbeat) = health.last_heartbeat {
1647 if let Ok(duration) = now.duration_since(last_heartbeat) {
1648 if duration > config.heartbeat_timeout {
1649 let previous_status = health.status;
1650 health.status = ServiceHealthStatus::Unhealthy;
1651 health.error_message = Some("Heartbeat timeout".to_string());
1652
1653 if previous_status != ServiceHealthStatus::Unhealthy {
1655 let event = HealthEvent::HeartbeatTimeout {
1656 service_id: service_id.clone(),
1657 peer_id: PeerId::from("unknown".to_string()), };
1659
1660 if let Err(e) = health_event_tx.send(event) {
1661 debug!("Failed to send timeout event: {}", e);
1662 }
1663 }
1664 }
1665 }
1666 }
1667 }
1668 }
1669
1670 pub async fn get_stats(&self) -> MCPServerStats {
1672 self.stats.read().await.clone()
1673 }
1674
1675 pub async fn handle_heartbeat(&self, heartbeat: Heartbeat) -> Result<()> {
1677 let service_id = heartbeat.service_id.clone();
1678 let peer_id = heartbeat.peer_id.clone();
1679
1680 {
1682 let mut health_guard = self.service_health.write().await;
1683 let health = health_guard.entry(service_id.clone()).or_insert_with(|| ServiceHealth {
1684 service_id: service_id.clone(),
1685 status: ServiceHealthStatus::Healthy,
1686 last_health_check: None,
1687 last_heartbeat: None,
1688 failure_count: 0,
1689 success_count: 0,
1690 avg_response_time: Duration::from_millis(0),
1691 error_message: None,
1692 health_history: Vec::new(),
1693 });
1694
1695 health.last_heartbeat = Some(heartbeat.timestamp);
1696 health.status = ServiceHealthStatus::Healthy;
1697 health.failure_count = 0;
1698 health.error_message = None;
1699 }
1700
1701 let event = HealthEvent::HeartbeatReceived {
1703 service_id,
1704 peer_id,
1705 load: heartbeat.load,
1706 };
1707
1708 if let Err(e) = self.health_event_tx.send(event) {
1709 debug!("Failed to send heartbeat received event: {}", e);
1710 }
1711
1712 info!("Heartbeat received from service: {} (load: {:.2})", heartbeat.service_id, heartbeat.load);
1713 Ok(())
1714 }
1715
1716 pub async fn get_service_health(&self, service_id: &str) -> Option<ServiceHealth> {
1718 let health_guard = self.service_health.read().await;
1719 health_guard.get(service_id).cloned()
1720 }
1721
1722 pub async fn get_all_service_health(&self) -> HashMap<String, ServiceHealth> {
1724 self.service_health.read().await.clone()
1725 }
1726
1727 pub async fn get_healthy_services(&self) -> Vec<String> {
1729 let health_guard = self.service_health.read().await;
1730 health_guard
1731 .iter()
1732 .filter(|(_, health)| health.status == ServiceHealthStatus::Healthy)
1733 .map(|(service_id, _)| service_id.clone())
1734 .collect()
1735 }
1736
1737 pub async fn update_service_health(&self, service_id: String, status: ServiceHealthStatus, error_message: Option<String>) {
1739 let mut health_guard = self.service_health.write().await;
1740 if let Some(health) = health_guard.get_mut(&service_id) {
1741 let previous_status = health.status;
1742 health.status = status;
1743 health.error_message = error_message.clone();
1744
1745 if previous_status != status {
1747 let event = match status {
1748 ServiceHealthStatus::Healthy => HealthEvent::ServiceHealthy {
1749 service_id: service_id.clone(),
1750 peer_id: PeerId::from("manual".to_string()),
1751 },
1752 ServiceHealthStatus::Unhealthy => HealthEvent::ServiceUnhealthy {
1753 service_id: service_id.clone(),
1754 peer_id: PeerId::from("manual".to_string()),
1755 error: error_message.unwrap_or_else(|| "Manually set to unhealthy".to_string()),
1756 },
1757 ServiceHealthStatus::Degraded => HealthEvent::ServiceDegraded {
1758 service_id: service_id.clone(),
1759 peer_id: PeerId::from("manual".to_string()),
1760 reason: "Manually set to degraded".to_string(),
1761 },
1762 _ => return,
1763 };
1764
1765 if let Err(e) = self.health_event_tx.send(event) {
1766 debug!("Failed to send manual health update event: {}", e);
1767 }
1768 }
1769 }
1770 }
1771
1772 pub fn subscribe_health_events(&self) -> mpsc::UnboundedReceiver<HealthEvent> {
1774 let (_tx, rx) = mpsc::unbounded_channel();
1777 rx
1778 }
1779
1780 pub async fn is_service_healthy(&self, service_id: &str) -> bool {
1782 if let Some(health) = self.get_service_health(service_id).await {
1783 health.status == ServiceHealthStatus::Healthy
1784 } else {
1785 false
1786 }
1787 }
1788
1789 pub async fn get_service_load_info(&self) -> HashMap<String, f32> {
1791 let health_guard = self.service_health.read().await;
1794 health_guard
1795 .iter()
1796 .map(|(service_id, health)| {
1797 let load = match health.status {
1798 ServiceHealthStatus::Healthy => 0.1,
1799 ServiceHealthStatus::Degraded => 0.7,
1800 ServiceHealthStatus::Unhealthy => 1.0,
1801 ServiceHealthStatus::Disabled => 0.0,
1802 ServiceHealthStatus::Unknown => 0.5,
1803 };
1804 (service_id.clone(), load)
1805 })
1806 .collect()
1807 }
1808
1809
1810 pub async fn call_remote_tool(&self, peer_id: &PeerId, tool_name: &str, arguments: Value, context: MCPCallContext) -> Result<Value> {
1812 let request_id = uuid::Uuid::new_v4().to_string();
1813
1814 let mcp_message = MCPMessage::CallTool {
1816 name: tool_name.to_string(),
1817 arguments,
1818 };
1819
1820 let p2p_message = P2PMCPMessage {
1822 message_type: P2PMCPMessageType::Request,
1823 message_id: request_id.clone(),
1824 source_peer: context.caller_id.clone(),
1825 target_peer: Some(peer_id.clone()),
1826 timestamp: SystemTime::now()
1827 .duration_since(std::time::UNIX_EPOCH)
1828 .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1829 .as_secs(),
1830 payload: mcp_message,
1831 ttl: 5, };
1833
1834 let message_data = serde_json::to_vec(&p2p_message)
1836 .map_err(|e| P2PError::Serialization(e))?;
1837
1838 if message_data.len() > MAX_MESSAGE_SIZE {
1839 return Err(P2PError::MCP("Message too large".to_string()));
1840 }
1841
1842 let (response_tx, _response_rx) = oneshot::channel::<MCPResponse>();
1844
1845 {
1847 let mut handlers = self.request_handlers.write().await;
1848 handlers.insert(request_id.clone(), response_tx);
1849 }
1850
1851 if let Some(ref network_sender) = *self.network_sender.read().await {
1853 network_sender.send_message(peer_id, MCP_PROTOCOL, message_data).await?;
1855
1856 debug!("MCP remote tool call sent to peer {}, tool: {}", peer_id, tool_name);
1859
1860 Ok(json!({
1863 "status": "sent",
1864 "message": "Remote tool call sent successfully",
1865 "peer_id": peer_id,
1866 "tool_name": tool_name
1867 }))
1868 } else {
1869 Err(P2PError::MCP("Network sender not configured".to_string()))
1870 }
1871 }
1872
1873 pub async fn handle_p2p_message(&self, message_data: &[u8], source_peer: &PeerId) -> Result<Option<Vec<u8>>> {
1875 let p2p_message: P2PMCPMessage = serde_json::from_slice(message_data)
1877 .map_err(|e| P2PError::Serialization(e))?;
1878
1879 debug!("Received MCP message from {}: {:?}", source_peer, p2p_message.message_type);
1880
1881 if let MCPMessage::CallTool { name, arguments } = &p2p_message.payload {
1883 if name == "heartbeat" {
1884 if let Ok(heartbeat) = serde_json::from_value::<Heartbeat>(arguments.clone()) {
1885 self.handle_heartbeat(heartbeat).await?;
1886 return Ok(None);
1887 }
1888 } else if name == "health_check" {
1889 let health_response = MCPMessage::CallToolResult {
1891 content: vec![],
1892 is_error: false,
1893 };
1894
1895 let response_message = P2PMCPMessage {
1896 message_type: P2PMCPMessageType::Response,
1897 message_id: p2p_message.message_id.clone(),
1898 source_peer: source_peer.clone(),
1899 target_peer: Some(p2p_message.source_peer.clone()),
1900 timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(),
1901 payload: health_response,
1902 ttl: 3,
1903 };
1904
1905 return Ok(Some(serde_json::to_vec(&response_message)?));
1906 }
1907 }
1908
1909 match p2p_message.message_type {
1910 P2PMCPMessageType::Request => {
1911 self.handle_remote_request(p2p_message).await
1912 }
1913 P2PMCPMessageType::Response => {
1914 self.handle_remote_response(p2p_message).await?;
1915 Ok(None) }
1917 P2PMCPMessageType::ServiceAdvertisement => {
1918 self.handle_service_advertisement(p2p_message).await?;
1919 Ok(None)
1920 }
1921 P2PMCPMessageType::ServiceDiscovery => {
1922 self.handle_service_discovery(p2p_message).await
1923 }
1924 P2PMCPMessageType::Heartbeat => {
1925 debug!("Received heartbeat message");
1926 Ok(None)
1927 }
1928 P2PMCPMessageType::HealthCheck => {
1929 debug!("Received health check message");
1930 Ok(None)
1931 }
1932 }
1933 }
1934
1935 async fn handle_remote_request(&self, message: P2PMCPMessage) -> Result<Option<Vec<u8>>> {
1937 match message.payload {
1938 MCPMessage::CallTool { name, arguments } => {
1939 let context = MCPCallContext {
1940 caller_id: message.source_peer.clone(),
1941 timestamp: SystemTime::now(),
1942 timeout: DEFAULT_CALL_TIMEOUT,
1943 auth_info: None,
1944 metadata: HashMap::new(),
1945 };
1946
1947 let result = self.call_tool(&name, arguments, context).await;
1949
1950 let response_payload = match result {
1952 Ok(value) => MCPMessage::CallToolResult {
1953 content: vec![MCPContent::Text { text: value.to_string() }],
1954 is_error: false,
1955 },
1956 Err(e) => MCPMessage::Error {
1957 code: -1,
1958 message: e.to_string(),
1959 data: None,
1960 },
1961 };
1962
1963 let response_message = P2PMCPMessage {
1964 message_type: P2PMCPMessageType::Response,
1965 message_id: message.message_id,
1966 source_peer: "local".to_string(), target_peer: Some(message.source_peer),
1968 timestamp: SystemTime::now()
1969 .duration_since(std::time::UNIX_EPOCH)
1970 .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1971 .as_secs(),
1972 payload: response_payload,
1973 ttl: message.ttl.saturating_sub(1),
1974 };
1975
1976 let response_data = serde_json::to_vec(&response_message)
1978 .map_err(|e| P2PError::Serialization(e))?;
1979
1980 Ok(Some(response_data))
1981 }
1982 MCPMessage::ListTools { cursor: _ } => {
1983 let (tools, _) = self.list_tools(None).await?;
1984
1985 let response_payload = MCPMessage::ListToolsResult {
1986 tools,
1987 next_cursor: None,
1988 };
1989
1990 let response_message = P2PMCPMessage {
1991 message_type: P2PMCPMessageType::Response,
1992 message_id: message.message_id,
1993 source_peer: "local".to_string(), target_peer: Some(message.source_peer),
1995 timestamp: SystemTime::now()
1996 .duration_since(std::time::UNIX_EPOCH)
1997 .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1998 .as_secs(),
1999 payload: response_payload,
2000 ttl: message.ttl.saturating_sub(1),
2001 };
2002
2003 let response_data = serde_json::to_vec(&response_message)
2004 .map_err(|e| P2PError::Serialization(e))?;
2005
2006 Ok(Some(response_data))
2007 }
2008 _ => {
2009 let error_response = P2PMCPMessage {
2011 message_type: P2PMCPMessageType::Response,
2012 message_id: message.message_id,
2013 source_peer: "local".to_string(), target_peer: Some(message.source_peer),
2015 timestamp: SystemTime::now()
2016 .duration_since(std::time::UNIX_EPOCH)
2017 .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
2018 .as_secs(),
2019 payload: MCPMessage::Error {
2020 code: -2,
2021 message: "Unsupported request type".to_string(),
2022 data: None,
2023 },
2024 ttl: message.ttl.saturating_sub(1),
2025 };
2026
2027 let response_data = serde_json::to_vec(&error_response)
2028 .map_err(|e| P2PError::Serialization(e))?;
2029
2030 Ok(Some(response_data))
2031 }
2032 }
2033 }
2034
2035 pub async fn generate_auth_token(&self, peer_id: &PeerId, permissions: Vec<MCPPermission>, ttl: Duration) -> Result<String> {
2039 if let Some(security_manager) = &self.security_manager {
2040 let token = security_manager.generate_token(peer_id, permissions, ttl).await?;
2041
2042 let mut details = HashMap::new();
2044 details.insert("action".to_string(), "token_generated".to_string());
2045 details.insert("ttl_seconds".to_string(), ttl.as_secs().to_string());
2046
2047 self.audit_logger.log_event(
2048 "authentication".to_string(),
2049 peer_id.clone(),
2050 details,
2051 AuditSeverity::Info,
2052 ).await;
2053
2054 Ok(token)
2055 } else {
2056 Err(P2PError::MCP("Authentication not enabled".to_string()))
2057 }
2058 }
2059
2060 pub async fn verify_auth_token(&self, token: &str) -> Result<TokenPayload> {
2062 if let Some(security_manager) = &self.security_manager {
2063 match security_manager.verify_token(token).await {
2064 Ok(payload) => {
2065 let mut details = HashMap::new();
2067 details.insert("action".to_string(), "token_verified".to_string());
2068 details.insert("subject".to_string(), payload.sub.clone());
2069
2070 self.audit_logger.log_event(
2071 "authentication".to_string(),
2072 payload.iss.clone(),
2073 details,
2074 AuditSeverity::Info,
2075 ).await;
2076
2077 Ok(payload)
2078 }
2079 Err(e) => {
2080 let mut details = HashMap::new();
2082 details.insert("action".to_string(), "token_verification_failed".to_string());
2083 details.insert("error".to_string(), e.to_string());
2084
2085 self.audit_logger.log_event(
2086 "authentication".to_string(),
2087 "unknown".to_string(),
2088 details,
2089 AuditSeverity::Warning,
2090 ).await;
2091
2092 Err(e)
2093 }
2094 }
2095 } else {
2096 Err(P2PError::MCP("Authentication not enabled".to_string()))
2097 }
2098 }
2099
2100 pub async fn check_permission(&self, peer_id: &PeerId, permission: &MCPPermission) -> Result<bool> {
2102 if let Some(security_manager) = &self.security_manager {
2103 security_manager.check_permission(peer_id, permission).await
2104 } else {
2105 Ok(true)
2107 }
2108 }
2109
2110 pub async fn check_rate_limit(&self, peer_id: &PeerId) -> Result<bool> {
2112 if let Some(security_manager) = &self.security_manager {
2113 let allowed = security_manager.check_rate_limit(peer_id).await?;
2114
2115 if !allowed {
2116 let mut details = HashMap::new();
2118 details.insert("action".to_string(), "rate_limit_exceeded".to_string());
2119
2120 self.audit_logger.log_event(
2121 "rate_limiting".to_string(),
2122 peer_id.clone(),
2123 details,
2124 AuditSeverity::Warning,
2125 ).await;
2126 }
2127
2128 Ok(allowed)
2129 } else {
2130 Ok(true)
2132 }
2133 }
2134
2135 pub async fn grant_permission(&self, peer_id: &PeerId, permission: MCPPermission) -> Result<()> {
2137 if let Some(security_manager) = &self.security_manager {
2138 security_manager.grant_permission(peer_id, permission.clone()).await?;
2139
2140 let mut details = HashMap::new();
2142 details.insert("action".to_string(), "permission_granted".to_string());
2143 details.insert("permission".to_string(), permission.as_str().to_string());
2144
2145 self.audit_logger.log_event(
2146 "authorization".to_string(),
2147 peer_id.clone(),
2148 details,
2149 AuditSeverity::Info,
2150 ).await;
2151
2152 Ok(())
2153 } else {
2154 Err(P2PError::MCP("Security not enabled".to_string()))
2155 }
2156 }
2157
2158 pub async fn revoke_permission(&self, peer_id: &PeerId, permission: &MCPPermission) -> Result<()> {
2160 if let Some(security_manager) = &self.security_manager {
2161 security_manager.revoke_permission(peer_id, permission).await?;
2162
2163 let mut details = HashMap::new();
2165 details.insert("action".to_string(), "permission_revoked".to_string());
2166 details.insert("permission".to_string(), permission.as_str().to_string());
2167
2168 self.audit_logger.log_event(
2169 "authorization".to_string(),
2170 peer_id.clone(),
2171 details,
2172 AuditSeverity::Info,
2173 ).await;
2174
2175 Ok(())
2176 } else {
2177 Err(P2PError::MCP("Security not enabled".to_string()))
2178 }
2179 }
2180
2181 pub async fn add_trusted_peer(&self, peer_id: PeerId) -> Result<()> {
2183 if let Some(security_manager) = &self.security_manager {
2184 security_manager.add_trusted_peer(peer_id.clone()).await?;
2185
2186 let mut details = HashMap::new();
2188 details.insert("action".to_string(), "trusted_peer_added".to_string());
2189
2190 self.audit_logger.log_event(
2191 "trust_management".to_string(),
2192 peer_id,
2193 details,
2194 AuditSeverity::Info,
2195 ).await;
2196
2197 Ok(())
2198 } else {
2199 Err(P2PError::MCP("Security not enabled".to_string()))
2200 }
2201 }
2202
2203 pub async fn is_trusted_peer(&self, peer_id: &PeerId) -> bool {
2205 if let Some(security_manager) = &self.security_manager {
2206 security_manager.is_trusted_peer(peer_id).await
2207 } else {
2208 false
2209 }
2210 }
2211
2212 pub async fn set_tool_security_policy(&self, tool_name: String, level: SecurityLevel) -> Result<()> {
2214 if let Some(security_manager) = &self.security_manager {
2215 security_manager.set_tool_policy(tool_name.clone(), level.clone()).await?;
2216
2217 let mut details = HashMap::new();
2219 details.insert("action".to_string(), "tool_policy_set".to_string());
2220 details.insert("tool_name".to_string(), tool_name);
2221 details.insert("security_level".to_string(), format!("{:?}", level));
2222
2223 self.audit_logger.log_event(
2224 "security_policy".to_string(),
2225 "system".to_string(),
2226 details,
2227 AuditSeverity::Info,
2228 ).await;
2229
2230 Ok(())
2231 } else {
2232 Err(P2PError::MCP("Security not enabled".to_string()))
2233 }
2234 }
2235
2236 pub async fn get_tool_security_policy(&self, tool_name: &str) -> SecurityLevel {
2238 if let Some(security_manager) = &self.security_manager {
2239 security_manager.get_tool_policy(tool_name).await
2240 } else {
2241 SecurityLevel::Public
2242 }
2243 }
2244
2245 pub async fn get_peer_security_stats(&self, peer_id: &PeerId) -> Option<PeerACL> {
2247 if let Some(security_manager) = &self.security_manager {
2248 security_manager.get_peer_stats(peer_id).await
2249 } else {
2250 None
2251 }
2252 }
2253
2254 pub async fn get_security_audit(&self, limit: Option<usize>) -> Vec<SecurityAuditEntry> {
2256 self.audit_logger.get_recent_entries(limit).await
2257 }
2258
2259 pub async fn security_cleanup(&self) -> Result<()> {
2261 if let Some(security_manager) = &self.security_manager {
2262 security_manager.cleanup().await?;
2263 }
2264 Ok(())
2265 }
2266
2267 async fn handle_remote_response(&self, message: P2PMCPMessage) -> Result<()> {
2269 let response_tx = {
2271 let mut handlers = self.request_handlers.write().await;
2272 handlers.remove(&message.message_id)
2273 };
2274
2275 if let Some(tx) = response_tx {
2276 let response = MCPResponse {
2277 request_id: message.message_id,
2278 message: message.payload,
2279 timestamp: SystemTime::now(),
2280 processing_time: Duration::from_millis(0), };
2282
2283 let _ = tx.send(response);
2285 } else {
2286 debug!("Received response for unknown request: {}", message.message_id);
2287 }
2288
2289 Ok(())
2290 }
2291
2292 pub async fn announce_local_services(&self) -> Result<()> {
2294 if let Some(dht) = &self.dht {
2295 let local_service = self.create_local_service_announcement().await?;
2297
2298 self.store_service_in_dht(&local_service, dht).await?;
2300
2301 if let Some(network_sender) = &*self.network_sender.read().await {
2303 self.broadcast_service_announcement(&local_service, network_sender).await?;
2304 }
2305
2306 info!("Announced local MCP service with {} tools", local_service.tools.len());
2307 }
2308
2309 Ok(())
2310 }
2311
2312 async fn create_local_service_announcement(&self) -> Result<MCPService> {
2314 let tools = self.tools.read().await;
2315 let tool_names: Vec<String> = tools.keys().cloned().collect();
2316
2317 let service = MCPService {
2318 service_id: format!("mcp-{}", self.config.server_name),
2319 node_id: "local".to_string(), tools: tool_names,
2321 capabilities: MCPCapabilities {
2322 experimental: None,
2323 sampling: None,
2324 tools: Some(MCPToolsCapability {
2325 list_changed: Some(true),
2326 }),
2327 prompts: None,
2328 resources: None,
2329 logging: None,
2330 },
2331 metadata: MCPServiceMetadata {
2332 name: self.config.server_name.clone(),
2333 version: self.config.server_version.clone(),
2334 description: Some("P2P MCP Service".to_string()),
2335 tags: vec!["p2p".to_string(), "mcp".to_string()],
2336 health_status: ServiceHealthStatus::Healthy,
2337 load_metrics: self.get_current_load_metrics().await,
2338 },
2339 registered_at: SystemTime::now(),
2340 endpoint: MCPEndpoint {
2341 protocol: "p2p".to_string(),
2342 address: "local".to_string(), port: None,
2344 tls: true,
2345 auth_required: false,
2346 },
2347 };
2348
2349 Ok(service)
2350 }
2351
2352 async fn get_current_load_metrics(&self) -> ServiceLoadMetrics {
2354 let stats = self.stats.read().await;
2355
2356 ServiceLoadMetrics {
2357 active_requests: 0, requests_per_second: stats.total_requests as f64 / 60.0, avg_response_time_ms: stats.avg_response_time.as_millis() as f64,
2360 error_rate: if stats.total_requests > 0 {
2361 stats.total_errors as f64 / stats.total_requests as f64
2362 } else {
2363 0.0
2364 },
2365 cpu_usage: 0.0, memory_usage: 0, }
2368 }
2369
2370 async fn store_service_in_dht(&self, service: &MCPService, dht: &Arc<RwLock<DHT>>) -> Result<()> {
2372 let service_key = Key::new(format!("mcp:service:{}", service.service_id).as_bytes());
2374 let service_data = serde_json::to_vec(service)
2375 .map_err(|e| P2PError::Serialization(e))?;
2376
2377 let mut dht_guard = dht.write().await;
2378 dht_guard.put(service_key.clone(), service_data).await
2379 .map_err(|e| P2PError::DHT(format!("Failed to store service in DHT: {}", e)))?;
2380
2381 let services_key = Key::new(b"mcp:services:index");
2383 let mut service_ids = match dht_guard.get(&services_key).await {
2384 Some(record) => {
2385 serde_json::from_slice::<Vec<String>>(&record.value).unwrap_or_default()
2386 }
2387 None => Vec::new(),
2388 };
2389
2390 if !service_ids.contains(&service.service_id) {
2391 service_ids.push(service.service_id.clone());
2392
2393 let index_data = serde_json::to_vec(&service_ids)
2394 .map_err(|e| P2PError::Serialization(e))?;
2395
2396 dht_guard.put(services_key, index_data).await
2397 .map_err(|e| P2PError::DHT(format!("Failed to update services index: {}", e)))?;
2398 }
2399
2400 Ok(())
2401 }
2402
2403 async fn broadcast_service_announcement(&self, service: &MCPService, network_sender: &Arc<dyn NetworkSender>) -> Result<()> {
2405 let announcement = P2PMCPMessage {
2406 message_type: P2PMCPMessageType::ServiceAdvertisement,
2407 message_id: uuid::Uuid::new_v4().to_string(),
2408 source_peer: network_sender.local_peer_id().clone(),
2409 target_peer: None, timestamp: SystemTime::now()
2411 .duration_since(std::time::UNIX_EPOCH)
2412 .unwrap_or_default()
2413 .as_secs(),
2414 payload: MCPMessage::ListToolsResult {
2415 tools: service.tools.iter().map(|tool_name| MCPTool {
2416 name: tool_name.clone(),
2417 description: format!("Tool from {}", service.metadata.name),
2418 input_schema: json!({"type": "object"}),
2419 }).collect(),
2420 next_cursor: None,
2421 },
2422 ttl: 3,
2423 };
2424
2425 let announcement_data = serde_json::to_vec(&announcement)
2426 .map_err(|e| P2PError::Serialization(e))?;
2427
2428 debug!("Service announcement prepared for broadcast: {} tools", service.tools.len());
2431
2432 Ok(())
2433 }
2434
2435 pub async fn discover_remote_services(&self) -> Result<Vec<MCPService>> {
2437 if let Some(dht) = &self.dht {
2438 let services_key = Key::new(b"mcp:services:index");
2439 let dht_guard = dht.read().await;
2440
2441 let service_ids = match dht_guard.get(&services_key).await {
2442 Some(record) => {
2443 serde_json::from_slice::<Vec<String>>(&record.value).unwrap_or_default()
2444 }
2445 None => {
2446 debug!("No services index found in DHT");
2447 return Ok(Vec::new());
2448 }
2449 };
2450
2451 let mut discovered_services = Vec::new();
2452
2453 for service_id in service_ids {
2454 let service_key = Key::new(format!("mcp:service:{}", service_id).as_bytes());
2455
2456 if let Some(record) = dht_guard.get(&service_key).await {
2457 match serde_json::from_slice::<MCPService>(&record.value) {
2458 Ok(service) => {
2459 if service.service_id != format!("mcp-{}", self.config.server_name) {
2461 discovered_services.push(service);
2462 }
2463 }
2464 Err(e) => {
2465 warn!("Failed to deserialize service {}: {}", service_id, e);
2466 }
2467 }
2468 }
2469 }
2470
2471 debug!("Discovered {} remote MCP services", discovered_services.len());
2472 Ok(discovered_services)
2473 } else {
2474 Ok(Vec::new())
2475 }
2476 }
2477
2478 pub async fn refresh_service_discovery(&self) -> Result<()> {
2480 let discovered_services = self.discover_remote_services().await?;
2481
2482 {
2484 let mut remote_cache = self.remote_services.write().await;
2485 remote_cache.clear();
2486
2487 for service in discovered_services {
2488 remote_cache.insert(service.service_id.clone(), service);
2489 }
2490 }
2491
2492 {
2494 let local_service = self.create_local_service_announcement().await?;
2495 let mut local_cache = self.local_services.write().await;
2496 local_cache.insert(local_service.service_id.clone(), local_service);
2497 }
2498
2499 debug!("Service discovery refresh completed");
2500 Ok(())
2501 }
2502
2503 pub async fn get_all_services(&self) -> Result<Vec<MCPService>> {
2505 let mut all_services = Vec::new();
2506
2507 {
2509 let local_services = self.local_services.read().await;
2510 all_services.extend(local_services.values().cloned());
2511 }
2512
2513 {
2515 let remote_services = self.remote_services.read().await;
2516 all_services.extend(remote_services.values().cloned());
2517 }
2518
2519 Ok(all_services)
2520 }
2521
2522 pub async fn find_services_with_tool(&self, tool_name: &str) -> Result<Vec<MCPService>> {
2524 let all_services = self.get_all_services().await?;
2525
2526 let matching_services = all_services
2527 .into_iter()
2528 .filter(|service| service.tools.contains(&tool_name.to_string()))
2529 .collect();
2530
2531 Ok(matching_services)
2532 }
2533
2534 pub async fn handle_service_advertisement(&self, message: P2PMCPMessage) -> Result<()> {
2536 debug!("Received service advertisement from peer: {}", message.source_peer);
2537
2538 if let MCPMessage::ListToolsResult { tools, .. } = message.payload {
2540 let service = MCPService {
2542 service_id: format!("mcp-{}", message.source_peer),
2543 node_id: message.source_peer.clone(),
2544 tools: tools.iter().map(|t| t.name.clone()).collect(),
2545 capabilities: MCPCapabilities {
2546 experimental: None,
2547 sampling: None,
2548 tools: Some(MCPToolsCapability {
2549 list_changed: Some(true),
2550 }),
2551 prompts: None,
2552 resources: None,
2553 logging: None,
2554 },
2555 metadata: MCPServiceMetadata {
2556 name: format!("Remote MCP Service - {}", message.source_peer),
2557 version: "unknown".to_string(),
2558 description: Some("Remote P2P MCP Service".to_string()),
2559 tags: vec!["p2p".to_string(), "remote".to_string()],
2560 health_status: ServiceHealthStatus::Healthy,
2561 load_metrics: ServiceLoadMetrics {
2562 active_requests: 0,
2563 requests_per_second: 0.0,
2564 avg_response_time_ms: 0.0,
2565 error_rate: 0.0,
2566 cpu_usage: 0.0,
2567 memory_usage: 0,
2568 },
2569 },
2570 registered_at: SystemTime::now(),
2571 endpoint: MCPEndpoint {
2572 protocol: "p2p".to_string(),
2573 address: message.source_peer.clone(),
2574 port: None,
2575 tls: true,
2576 auth_required: false,
2577 },
2578 };
2579
2580 {
2582 let mut remote_services = self.remote_services.write().await;
2583 remote_services.insert(service.service_id.clone(), service.clone());
2584 }
2585
2586 if let Some(dht) = &self.dht {
2588 if let Err(e) = self.store_service_in_dht(&service, dht).await {
2589 warn!("Failed to store remote service in DHT: {}", e);
2590 }
2591 }
2592
2593 info!("Registered remote MCP service from {} with {} tools",
2594 message.source_peer, tools.len());
2595 }
2596
2597 Ok(())
2598 }
2599
2600 pub async fn handle_service_discovery(&self, message: P2PMCPMessage) -> Result<Option<Vec<u8>>> {
2602 let local_services: Vec<MCPService> = {
2604 let services = self.local_services.read().await;
2605 services.values().cloned().collect()
2606 };
2607
2608 if !local_services.is_empty() {
2609 let advertisement = P2PMCPMessage {
2610 message_type: P2PMCPMessageType::ServiceAdvertisement,
2611 message_id: uuid::Uuid::new_v4().to_string(),
2612 source_peer: "local".to_string(), target_peer: Some(message.source_peer),
2614 timestamp: SystemTime::now()
2615 .duration_since(std::time::UNIX_EPOCH)
2616 .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
2617 .as_secs(),
2618 payload: MCPMessage::ListToolsResult {
2619 tools: local_services.into_iter()
2620 .flat_map(|s| s.tools.into_iter().map(|t| MCPTool {
2621 name: t,
2622 description: "Remote tool".to_string(),
2623 input_schema: json!({"type": "object"}),
2624 }))
2625 .collect(),
2626 next_cursor: None,
2627 },
2628 ttl: message.ttl.saturating_sub(1),
2629 };
2630
2631 let response_data = serde_json::to_vec(&advertisement)
2632 .map_err(|e| P2PError::Serialization(e))?;
2633
2634 Ok(Some(response_data))
2635 } else {
2636 Ok(None)
2637 }
2638 }
2639
2640 pub async fn shutdown(&self) -> Result<()> {
2642 info!("Shutting down MCP server");
2643
2644 {
2646 let mut sessions = self.sessions.write().await;
2647 for session in sessions.values_mut() {
2648 session.state = MCPSessionState::Terminated;
2649 }
2650 sessions.clear();
2651 }
2652
2653 info!("MCP server shutdown complete");
2656 Ok(())
2657 }
2658}
2659
2660impl Tool {
2661 pub fn new(name: &str, description: &str, input_schema: Value) -> ToolBuilder {
2663 ToolBuilder {
2664 name: name.to_string(),
2665 description: description.to_string(),
2666 input_schema,
2667 handler: None,
2668 tags: Vec::new(),
2669 }
2670 }
2671}
2672
2673pub struct ToolBuilder {
2675 name: String,
2676 description: String,
2677 input_schema: Value,
2678 handler: Option<Box<dyn ToolHandler + Send + Sync>>,
2679 tags: Vec<String>,
2680}
2681
2682impl ToolBuilder {
2683 pub fn handler<H: ToolHandler + Send + Sync + 'static>(mut self, handler: H) -> Self {
2685 self.handler = Some(Box::new(handler));
2686 self
2687 }
2688
2689 pub fn tags(mut self, tags: Vec<String>) -> Self {
2691 self.tags = tags;
2692 self
2693 }
2694
2695 pub fn build(self) -> Result<Tool> {
2697 let handler = self.handler
2698 .ok_or_else(|| P2PError::MCP("Tool handler is required".to_string()))?;
2699
2700 let definition = MCPTool {
2701 name: self.name,
2702 description: self.description,
2703 input_schema: self.input_schema,
2704 };
2705
2706 let metadata = ToolMetadata {
2707 created_at: SystemTime::now(),
2708 last_called: None,
2709 call_count: 0,
2710 avg_execution_time: Duration::from_millis(0),
2711 health_status: ToolHealthStatus::Healthy,
2712 tags: self.tags,
2713 };
2714
2715 Ok(Tool {
2716 definition,
2717 handler,
2718 metadata,
2719 })
2720 }
2721}
2722
2723pub struct FunctionToolHandler<F> {
2725 function: F,
2726}
2727
2728impl<F, Fut> ToolHandler for FunctionToolHandler<F>
2729where
2730 F: Fn(Value) -> Fut + Send + Sync,
2731 Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
2732{
2733 fn execute(&self, arguments: Value) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>> {
2734 Box::pin((self.function)(arguments))
2735 }
2736}
2737
2738impl<F> FunctionToolHandler<F> {
2739 pub fn new(function: F) -> Self {
2741 Self { function }
2742 }
2743}
2744
2745impl MCPService {
2747 pub fn new(service_id: String, node_id: PeerId) -> Self {
2749 Self {
2750 service_id,
2751 node_id,
2752 tools: Vec::new(),
2753 capabilities: MCPCapabilities {
2754 experimental: None,
2755 sampling: None,
2756 tools: Some(MCPToolsCapability {
2757 list_changed: Some(true),
2758 }),
2759 prompts: None,
2760 resources: None,
2761 logging: None,
2762 },
2763 metadata: MCPServiceMetadata {
2764 name: "MCP Service".to_string(),
2765 version: "1.0.0".to_string(),
2766 description: None,
2767 tags: Vec::new(),
2768 health_status: ServiceHealthStatus::Healthy,
2769 load_metrics: ServiceLoadMetrics {
2770 active_requests: 0,
2771 requests_per_second: 0.0,
2772 avg_response_time_ms: 0.0,
2773 error_rate: 0.0,
2774 cpu_usage: 0.0,
2775 memory_usage: 0,
2776 },
2777 },
2778 registered_at: SystemTime::now(),
2779 endpoint: MCPEndpoint {
2780 protocol: "p2p".to_string(),
2781 address: "".to_string(),
2782 port: None,
2783 tls: false,
2784 auth_required: false,
2785 },
2786 }
2787 }
2788}
2789
2790impl Default for MCPCapabilities {
2791 fn default() -> Self {
2792 Self {
2793 experimental: None,
2794 sampling: None,
2795 tools: Some(MCPToolsCapability {
2796 list_changed: Some(true),
2797 }),
2798 prompts: Some(MCPPromptsCapability {
2799 list_changed: Some(true),
2800 }),
2801 resources: Some(MCPResourcesCapability {
2802 subscribe: Some(true),
2803 list_changed: Some(true),
2804 }),
2805 logging: Some(MCPLoggingCapability {
2806 levels: Some(vec![
2807 MCPLogLevel::Debug,
2808 MCPLogLevel::Info,
2809 MCPLogLevel::Warning,
2810 MCPLogLevel::Error,
2811 ]),
2812 }),
2813 }
2814 }
2815}
2816
2817#[cfg(test)]
2818mod tests {
2819 use super::*;
2820 use crate::dht::{DHT, DHTConfig, Key};
2821 use std::pin::Pin;
2822 use std::future::Future;
2823 use tokio::time::timeout;
2824
2825 struct TestTool {
2827 name: String,
2828 should_error: bool,
2829 execution_time: Duration,
2830 }
2831
2832 impl TestTool {
2833 fn new(name: &str) -> Self {
2834 Self {
2835 name: name.to_string(),
2836 should_error: false,
2837 execution_time: Duration::from_millis(10),
2838 }
2839 }
2840
2841 fn with_error(mut self) -> Self {
2842 self.should_error = true;
2843 self
2844 }
2845
2846 fn with_execution_time(mut self, duration: Duration) -> Self {
2847 self.execution_time = duration;
2848 self
2849 }
2850 }
2851
2852 impl ToolHandler for TestTool {
2853 fn execute(&self, arguments: Value) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>> {
2854 let should_error = self.should_error;
2855 let execution_time = self.execution_time;
2856 let name = self.name.clone();
2857
2858 Box::pin(async move {
2859 tokio::time::sleep(execution_time).await;
2860
2861 if should_error {
2862 return Err(P2PError::MCP(format!("Test error from tool {}", name)).into());
2863 }
2864
2865 Ok(json!({
2867 "tool": name,
2868 "arguments": arguments,
2869 "result": "success"
2870 }))
2871 })
2872 }
2873
2874 fn validate(&self, arguments: &Value) -> Result<()> {
2875 if !arguments.is_object() {
2876 return Err(P2PError::MCP("Arguments must be an object".to_string()).into());
2877 }
2878 Ok(())
2879 }
2880
2881 fn get_requirements(&self) -> ToolRequirements {
2882 ToolRequirements {
2883 max_memory: Some(1024 * 1024), max_execution_time: Some(Duration::from_secs(5)),
2885 required_capabilities: vec!["test".to_string()],
2886 requires_network: false,
2887 requires_filesystem: false,
2888 }
2889 }
2890 }
2891
2892 async fn create_test_mcp_server() -> MCPServer {
2894 let config = MCPServerConfig {
2895 server_name: "test_server".to_string(),
2896 server_version: "1.0.0".to_string(),
2897 enable_auth: false,
2898 enable_rate_limiting: false,
2899 max_concurrent_requests: 10,
2900 request_timeout: Duration::from_secs(30),
2901 enable_dht_discovery: true,
2902 rate_limit_rpm: 60,
2903 enable_logging: true,
2904 max_tool_execution_time: Duration::from_secs(30),
2905 tool_memory_limit: 100 * 1024 * 1024,
2906 health_monitor: HealthMonitorConfig::default(),
2907 };
2908
2909 MCPServer::new(config)
2910 }
2911
2912 fn create_test_tool(name: &str) -> Tool {
2914 Tool {
2915 definition: MCPTool {
2916 name: name.to_string(),
2917 description: format!("Test tool: {}", name),
2918 input_schema: json!({
2919 "type": "object",
2920 "properties": {
2921 "input": { "type": "string" }
2922 }
2923 }),
2924 },
2925 handler: Box::new(TestTool::new(name)),
2926 metadata: ToolMetadata {
2927 created_at: SystemTime::now(),
2928 last_called: None,
2929 call_count: 0,
2930 avg_execution_time: Duration::from_millis(0),
2931 health_status: ToolHealthStatus::Healthy,
2932 tags: vec!["test".to_string()],
2933 },
2934 }
2935 }
2936
2937 async fn create_test_dht() -> DHT {
2939 let local_id = Key::new(b"test_node_id");
2940 let config = DHTConfig::default();
2941 DHT::new(local_id, config)
2942 }
2943
2944 fn create_test_context(caller_id: PeerId) -> MCPCallContext {
2946 MCPCallContext {
2947 caller_id,
2948 timestamp: SystemTime::now(),
2949 timeout: Duration::from_secs(30),
2950 auth_info: None,
2951 metadata: HashMap::new(),
2952 }
2953 }
2954
2955 #[tokio::test]
2956 async fn test_mcp_server_creation() {
2957 let server = create_test_mcp_server().await;
2958 assert_eq!(server.config.server_name, "test_server");
2959 assert_eq!(server.config.server_version, "1.0.0");
2960 assert!(!server.config.enable_auth);
2961 assert!(!server.config.enable_rate_limiting);
2962 }
2963
2964 #[tokio::test]
2965 async fn test_tool_registration() -> Result<()> {
2966 let server = create_test_mcp_server().await;
2967 let tool = create_test_tool("test_calculator");
2968
2969 server.register_tool(tool).await?;
2971
2972 let tools = server.tools.read().await;
2974 assert!(tools.contains_key("test_calculator"));
2975 assert_eq!(tools.get("test_calculator").unwrap().definition.name, "test_calculator");
2976
2977 let stats = server.stats.read().await;
2979 assert_eq!(stats.total_tools, 1);
2980
2981 Ok(())
2982 }
2983
2984 #[tokio::test]
2985 async fn test_tool_registration_duplicate() -> Result<()> {
2986 let server = create_test_mcp_server().await;
2987 let tool1 = create_test_tool("duplicate_tool");
2988 let tool2 = create_test_tool("duplicate_tool");
2989
2990 server.register_tool(tool1).await?;
2992
2993 let result = server.register_tool(tool2).await;
2995 assert!(result.is_err());
2996 assert!(result.unwrap_err().to_string().contains("Tool already exists"));
2997
2998 Ok(())
2999 }
3000
3001 #[tokio::test]
3002 async fn test_tool_validation() {
3003 let server = create_test_mcp_server().await;
3004
3005 let mut invalid_tool = create_test_tool("");
3007 let result = server.validate_tool(&invalid_tool).await;
3008 assert!(result.is_err());
3009
3010 invalid_tool.definition.name = "a".repeat(200);
3012 let result = server.validate_tool(&invalid_tool).await;
3013 assert!(result.is_err());
3014
3015 let mut invalid_schema_tool = create_test_tool("valid_name");
3017 invalid_schema_tool.definition.input_schema = json!("not an object");
3018 let result = server.validate_tool(&invalid_schema_tool).await;
3019 assert!(result.is_err());
3020
3021 let valid_tool = create_test_tool("valid_tool");
3023 let result = server.validate_tool(&valid_tool).await;
3024 assert!(result.is_ok());
3025 }
3026
3027 #[tokio::test]
3028 async fn test_tool_call_success() -> Result<()> {
3029 let server = create_test_mcp_server().await;
3030 let tool = create_test_tool("success_tool");
3031 server.register_tool(tool).await?;
3032
3033 let caller_id = "test_peer_123".to_string();
3034 let context = create_test_context(caller_id);
3035 let arguments = json!({"input": "test data"});
3036
3037 let result = server.call_tool("success_tool", arguments.clone(), context).await?;
3038
3039 assert_eq!(result["tool"], "success_tool");
3041 assert_eq!(result["arguments"], arguments);
3042 assert_eq!(result["result"], "success");
3043
3044 let tools = server.tools.read().await;
3046 let tool_metadata = &tools.get("success_tool").unwrap().metadata;
3047 assert_eq!(tool_metadata.call_count, 1);
3048 assert!(tool_metadata.last_called.is_some());
3049
3050 Ok(())
3051 }
3052
3053 #[tokio::test]
3054 async fn test_tool_call_nonexistent() -> Result<()> {
3055 let server = create_test_mcp_server().await;
3056 let caller_id = "test_peer_456".to_string();
3057 let context = create_test_context(caller_id);
3058 let arguments = json!({"input": "test"});
3059
3060 let result = server.call_tool("nonexistent_tool", arguments, context).await;
3061 assert!(result.is_err());
3062 assert!(result.unwrap_err().to_string().contains("Tool not found"));
3063
3064 Ok(())
3065 }
3066
3067 #[tokio::test]
3068 async fn test_tool_call_handler_error() -> Result<()> {
3069 let server = create_test_mcp_server().await;
3070 let tool = Tool {
3071 definition: MCPTool {
3072 name: "error_tool".to_string(),
3073 description: "Tool that always errors".to_string(),
3074 input_schema: json!({"type": "object"}),
3075 },
3076 handler: Box::new(TestTool::new("error_tool").with_error()),
3077 metadata: ToolMetadata {
3078 created_at: SystemTime::now(),
3079 last_called: None,
3080 call_count: 0,
3081 avg_execution_time: Duration::from_millis(0),
3082 health_status: ToolHealthStatus::Healthy,
3083 tags: vec![],
3084 },
3085 };
3086
3087 server.register_tool(tool).await?;
3088
3089 let caller_id = "test_peer_error".to_string();
3090 let context = create_test_context(caller_id);
3091 let arguments = json!({"input": "test"});
3092
3093 let result = server.call_tool("error_tool", arguments, context).await;
3094 assert!(result.is_err());
3095 assert!(result.unwrap_err().to_string().contains("Test error from tool error_tool"));
3096
3097 Ok(())
3098 }
3099
3100 #[tokio::test]
3101 async fn test_tool_call_timeout() -> Result<()> {
3102 let server = create_test_mcp_server().await;
3103 let slow_tool = Tool {
3104 definition: MCPTool {
3105 name: "slow_tool".to_string(),
3106 description: "Tool that takes too long".to_string(),
3107 input_schema: json!({"type": "object"}),
3108 },
3109 handler: Box::new(TestTool::new("slow_tool").with_execution_time(Duration::from_secs(2))),
3110 metadata: ToolMetadata {
3111 created_at: SystemTime::now(),
3112 last_called: None,
3113 call_count: 0,
3114 avg_execution_time: Duration::from_millis(0),
3115 health_status: ToolHealthStatus::Healthy,
3116 tags: vec![],
3117 },
3118 };
3119
3120 server.register_tool(slow_tool).await?;
3121
3122 let caller_id = "test_peer_error".to_string();
3123 let context = create_test_context(caller_id);
3124 let arguments = json!({"input": "test"});
3125
3126 let result = timeout(
3128 Duration::from_millis(100),
3129 server.call_tool("slow_tool", arguments, context)
3130 ).await;
3131
3132 assert!(result.is_err()); Ok(())
3135 }
3136
3137 #[tokio::test]
3138 async fn test_tool_requirements() {
3139 let tool = TestTool::new("req_tool");
3140 let requirements = tool.get_requirements();
3141
3142 assert_eq!(requirements.max_memory, Some(1024 * 1024));
3143 assert_eq!(requirements.max_execution_time, Some(Duration::from_secs(5)));
3144 assert_eq!(requirements.required_capabilities, vec!["test"]);
3145 assert!(!requirements.requires_network);
3146 assert!(!requirements.requires_filesystem);
3147 }
3148
3149 #[tokio::test]
3150 async fn test_tool_validation_handler() {
3151 let tool = TestTool::new("validation_tool");
3152
3153 let valid_args = json!({"key": "value"});
3155 assert!(tool.validate(&valid_args).is_ok());
3156
3157 let invalid_args = json!("not an object");
3159 assert!(tool.validate(&invalid_args).is_err());
3160
3161 let invalid_args = json!(123);
3162 assert!(tool.validate(&invalid_args).is_err());
3163 }
3164
3165 #[tokio::test]
3166 async fn test_tool_health_status() {
3167 let mut metadata = ToolMetadata {
3168 created_at: SystemTime::now(),
3169 last_called: None,
3170 call_count: 0,
3171 avg_execution_time: Duration::from_millis(0),
3172 health_status: ToolHealthStatus::Healthy,
3173 tags: vec![],
3174 };
3175
3176 assert_eq!(metadata.health_status, ToolHealthStatus::Healthy);
3178
3179 metadata.health_status = ToolHealthStatus::Degraded;
3180 assert_eq!(metadata.health_status, ToolHealthStatus::Degraded);
3181
3182 metadata.health_status = ToolHealthStatus::Unhealthy;
3183 assert_eq!(metadata.health_status, ToolHealthStatus::Unhealthy);
3184
3185 metadata.health_status = ToolHealthStatus::Disabled;
3186 assert_eq!(metadata.health_status, ToolHealthStatus::Disabled);
3187 }
3188
3189 #[tokio::test]
3190 async fn test_mcp_capabilities() {
3191 let server = create_test_mcp_server().await;
3192 let capabilities = server.get_server_capabilities().await;
3193
3194 assert!(capabilities.tools.is_some());
3195 assert!(capabilities.prompts.is_some());
3196 assert!(capabilities.resources.is_some());
3197 assert!(capabilities.logging.is_some());
3198
3199 let tools_cap = capabilities.tools.unwrap();
3200 assert_eq!(tools_cap.list_changed, Some(true));
3201
3202 let logging_cap = capabilities.logging.unwrap();
3203 let levels = logging_cap.levels.unwrap();
3204 assert!(levels.contains(&MCPLogLevel::Debug));
3205 assert!(levels.contains(&MCPLogLevel::Info));
3206 assert!(levels.contains(&MCPLogLevel::Warning));
3207 assert!(levels.contains(&MCPLogLevel::Error));
3208 }
3209
3210 #[tokio::test]
3211 async fn test_mcp_message_serialization() {
3212 let init_msg = MCPMessage::Initialize {
3214 protocol_version: MCP_VERSION.to_string(),
3215 capabilities: MCPCapabilities {
3216 experimental: None,
3217 sampling: None,
3218 tools: Some(MCPToolsCapability { list_changed: Some(true) }),
3219 prompts: None,
3220 resources: None,
3221 logging: None,
3222 },
3223 client_info: MCPClientInfo {
3224 name: "test_client".to_string(),
3225 version: "1.0.0".to_string(),
3226 },
3227 };
3228
3229 let serialized = serde_json::to_string(&init_msg).unwrap();
3230 let deserialized: MCPMessage = serde_json::from_str(&serialized).unwrap();
3231
3232 match deserialized {
3233 MCPMessage::Initialize { protocol_version, client_info, .. } => {
3234 assert_eq!(protocol_version, MCP_VERSION);
3235 assert_eq!(client_info.name, "test_client");
3236 assert_eq!(client_info.version, "1.0.0");
3237 }
3238 _ => panic!("Wrong message type after deserialization"),
3239 }
3240 }
3241
3242 #[tokio::test]
3243 async fn test_mcp_content_types() {
3244 let text_content = MCPContent::Text {
3246 text: "Hello, world!".to_string(),
3247 };
3248
3249 let serialized = serde_json::to_string(&text_content).unwrap();
3250 let deserialized: MCPContent = serde_json::from_str(&serialized).unwrap();
3251
3252 match deserialized {
3253 MCPContent::Text { text } => assert_eq!(text, "Hello, world!"),
3254 _ => panic!("Wrong content type"),
3255 }
3256
3257 let image_content = MCPContent::Image {
3259 data: "base64data".to_string(),
3260 mime_type: "image/png".to_string(),
3261 };
3262
3263 let serialized = serde_json::to_string(&image_content).unwrap();
3264 let deserialized: MCPContent = serde_json::from_str(&serialized).unwrap();
3265
3266 match deserialized {
3267 MCPContent::Image { data, mime_type } => {
3268 assert_eq!(data, "base64data");
3269 assert_eq!(mime_type, "image/png");
3270 }
3271 _ => panic!("Wrong content type"),
3272 }
3273 }
3274
3275 #[tokio::test]
3276 async fn test_service_health_status() {
3277 let mut metrics = ServiceLoadMetrics {
3278 active_requests: 0,
3279 requests_per_second: 0.0,
3280 avg_response_time_ms: 0.0,
3281 error_rate: 0.0,
3282 cpu_usage: 0.0,
3283 memory_usage: 0,
3284 };
3285
3286 let metadata = MCPServiceMetadata {
3288 name: "test_service".to_string(),
3289 version: "1.0.0".to_string(),
3290 description: Some("Test service".to_string()),
3291 tags: vec!["test".to_string()],
3292 health_status: ServiceHealthStatus::Healthy,
3293 load_metrics: metrics.clone(),
3294 };
3295
3296 assert_eq!(metadata.health_status, ServiceHealthStatus::Healthy);
3297
3298 metrics.error_rate = 0.5; let degraded_metadata = MCPServiceMetadata {
3301 health_status: ServiceHealthStatus::Degraded,
3302 load_metrics: metrics.clone(),
3303 ..metadata.clone()
3304 };
3305
3306 assert_eq!(degraded_metadata.health_status, ServiceHealthStatus::Degraded);
3307
3308 let unhealthy_metadata = MCPServiceMetadata {
3309 health_status: ServiceHealthStatus::Unhealthy,
3310 ..metadata.clone()
3311 };
3312
3313 assert_eq!(unhealthy_metadata.health_status, ServiceHealthStatus::Unhealthy);
3314 }
3315
3316 #[tokio::test]
3317 async fn test_p2p_mcp_message() {
3318 let source_peer = "source_peer_123".to_string();
3319 let target_peer = "target_peer_456".to_string();
3320
3321 let p2p_message = P2PMCPMessage {
3322 message_type: P2PMCPMessageType::Request,
3323 message_id: uuid::Uuid::new_v4().to_string(),
3324 source_peer: source_peer.clone(),
3325 target_peer: Some(target_peer.clone()),
3326 timestamp: SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(),
3327 payload: MCPMessage::ListTools { cursor: None },
3328 ttl: 10,
3329 };
3330
3331 let serialized = serde_json::to_string(&p2p_message).unwrap();
3333 let deserialized: P2PMCPMessage = serde_json::from_str(&serialized).unwrap();
3334
3335 assert_eq!(deserialized.message_type, P2PMCPMessageType::Request);
3336 assert_eq!(deserialized.source_peer, source_peer);
3337 assert_eq!(deserialized.target_peer, Some(target_peer));
3338 assert_eq!(deserialized.ttl, 10);
3339
3340 match deserialized.payload {
3341 MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
3342 _ => panic!("Wrong message payload type"),
3343 }
3344 }
3345
3346 #[tokio::test]
3347 async fn test_tool_requirements_default() {
3348 let default_requirements = ToolRequirements::default();
3349
3350 assert_eq!(default_requirements.max_memory, Some(100 * 1024 * 1024));
3351 assert_eq!(default_requirements.max_execution_time, Some(Duration::from_secs(30)));
3352 assert!(default_requirements.required_capabilities.is_empty());
3353 assert!(!default_requirements.requires_network);
3354 assert!(!default_requirements.requires_filesystem);
3355 }
3356
3357 #[tokio::test]
3358 async fn test_mcp_server_stats() {
3359 let server = create_test_mcp_server().await;
3360
3361 let stats = server.stats.read().await;
3363 assert_eq!(stats.total_tools, 0);
3364 assert_eq!(stats.total_requests, 0);
3365 assert_eq!(stats.total_responses, 0);
3366 assert_eq!(stats.total_errors, 0);
3367
3368 drop(stats);
3369
3370 let tool = create_test_tool("stats_test_tool");
3372 server.register_tool(tool).await.unwrap();
3373
3374 let stats = server.stats.read().await;
3375 assert_eq!(stats.total_tools, 1);
3376 }
3377
3378 #[tokio::test]
3379 async fn test_log_levels() {
3380 let levels = vec![
3382 MCPLogLevel::Debug,
3383 MCPLogLevel::Info,
3384 MCPLogLevel::Notice,
3385 MCPLogLevel::Warning,
3386 MCPLogLevel::Error,
3387 MCPLogLevel::Critical,
3388 MCPLogLevel::Alert,
3389 MCPLogLevel::Emergency,
3390 ];
3391
3392 for level in levels {
3393 let serialized = serde_json::to_string(&level).unwrap();
3394 let deserialized: MCPLogLevel = serde_json::from_str(&serialized).unwrap();
3395 assert_eq!(level as u8, deserialized as u8);
3396 }
3397 }
3398
3399 #[tokio::test]
3400 async fn test_mcp_endpoint() {
3401 let endpoint = MCPEndpoint {
3402 protocol: "p2p".to_string(),
3403 address: "127.0.0.1".to_string(),
3404 port: Some(9000),
3405 tls: true,
3406 auth_required: true,
3407 };
3408
3409 let serialized = serde_json::to_string(&endpoint).unwrap();
3410 let deserialized: MCPEndpoint = serde_json::from_str(&serialized).unwrap();
3411
3412 assert_eq!(deserialized.protocol, "p2p");
3413 assert_eq!(deserialized.address, "127.0.0.1");
3414 assert_eq!(deserialized.port, Some(9000));
3415 assert!(deserialized.tls);
3416 assert!(deserialized.auth_required);
3417 }
3418
3419 #[tokio::test]
3420 async fn test_mcp_service_metadata() {
3421 let load_metrics = ServiceLoadMetrics {
3422 active_requests: 5,
3423 requests_per_second: 10.5,
3424 avg_response_time_ms: 250.0,
3425 error_rate: 0.01,
3426 cpu_usage: 45.5,
3427 memory_usage: 1024 * 1024 * 100, };
3429
3430 let metadata = MCPServiceMetadata {
3431 name: "test_service".to_string(),
3432 version: "2.1.0".to_string(),
3433 description: Some("A test service for unit testing".to_string()),
3434 tags: vec!["test".to_string(), "unit".to_string(), "mcp".to_string()],
3435 health_status: ServiceHealthStatus::Healthy,
3436 load_metrics,
3437 };
3438
3439 let serialized = serde_json::to_string(&metadata).unwrap();
3441 let deserialized: MCPServiceMetadata = serde_json::from_str(&serialized).unwrap();
3442
3443 assert_eq!(deserialized.name, "test_service");
3444 assert_eq!(deserialized.version, "2.1.0");
3445 assert_eq!(deserialized.description, Some("A test service for unit testing".to_string()));
3446 assert_eq!(deserialized.tags, vec!["test", "unit", "mcp"]);
3447 assert_eq!(deserialized.health_status, ServiceHealthStatus::Healthy);
3448 assert_eq!(deserialized.load_metrics.active_requests, 5);
3449 assert_eq!(deserialized.load_metrics.requests_per_second, 10.5);
3450 }
3451
3452 #[tokio::test]
3453 async fn test_function_tool_handler() {
3454 let handler = FunctionToolHandler::new(|args: Value| async move {
3456 let name = args.get("name").and_then(|v| v.as_str()).unwrap_or("world");
3457 Ok(json!({"greeting": format!("Hello, {}!", name)}))
3458 });
3459
3460 let args = json!({"name": "Alice"});
3461 let result = handler.execute(args).await.unwrap();
3462 assert_eq!(result["greeting"], "Hello, Alice!");
3463
3464 let empty_args = json!({});
3466 let result = handler.execute(empty_args).await.unwrap();
3467 assert_eq!(result["greeting"], "Hello, world!");
3468 }
3469
3470 #[tokio::test]
3471 async fn test_mcp_service_creation() {
3472 let service_id = "test_service_123".to_string();
3473 let node_id = "test_node_789".to_string();
3474
3475 let service = MCPService::new(service_id.clone(), node_id.clone());
3476
3477 assert_eq!(service.service_id, service_id);
3478 assert_eq!(service.node_id, node_id);
3479 assert!(service.tools.is_empty());
3480 assert_eq!(service.metadata.name, "MCP Service");
3481 assert_eq!(service.metadata.version, "1.0.0");
3482 assert_eq!(service.metadata.health_status, ServiceHealthStatus::Healthy);
3483 assert_eq!(service.endpoint.protocol, "p2p");
3484 assert!(!service.endpoint.tls);
3485 assert!(!service.endpoint.auth_required);
3486 }
3487
3488 #[tokio::test]
3489 async fn test_mcp_capabilities_default() {
3490 let capabilities = MCPCapabilities::default();
3491
3492 assert!(capabilities.tools.is_some());
3493 assert!(capabilities.prompts.is_some());
3494 assert!(capabilities.resources.is_some());
3495 assert!(capabilities.logging.is_some());
3496
3497 let tools_cap = capabilities.tools.unwrap();
3498 assert_eq!(tools_cap.list_changed, Some(true));
3499
3500 let resources_cap = capabilities.resources.unwrap();
3501 assert_eq!(resources_cap.subscribe, Some(true));
3502 assert_eq!(resources_cap.list_changed, Some(true));
3503
3504 let logging_cap = capabilities.logging.unwrap();
3505 let levels = logging_cap.levels.unwrap();
3506 assert!(levels.contains(&MCPLogLevel::Debug));
3507 assert!(levels.contains(&MCPLogLevel::Info));
3508 assert!(levels.contains(&MCPLogLevel::Warning));
3509 assert!(levels.contains(&MCPLogLevel::Error));
3510 }
3511
3512 #[tokio::test]
3513 async fn test_mcp_request_creation() {
3514 let source_peer = "source_peer_123".to_string();
3515 let target_peer = "target_peer_456".to_string();
3516
3517 let request = MCPRequest {
3518 request_id: uuid::Uuid::new_v4().to_string(),
3519 source_peer: source_peer.clone(),
3520 target_peer: target_peer.clone(),
3521 message: MCPMessage::ListTools { cursor: None },
3522 timestamp: SystemTime::now(),
3523 timeout: Duration::from_secs(30),
3524 auth_token: Some("test_token".to_string()),
3525 };
3526
3527 assert_eq!(request.source_peer, source_peer);
3528 assert_eq!(request.target_peer, target_peer);
3529 assert_eq!(request.timeout, Duration::from_secs(30));
3530 assert_eq!(request.auth_token, Some("test_token".to_string()));
3531
3532 match request.message {
3533 MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
3534 _ => panic!("Wrong message type"),
3535 }
3536 }
3537
3538 #[tokio::test]
3539 async fn test_p2p_message_types() {
3540 assert_eq!(P2PMCPMessageType::Request, P2PMCPMessageType::Request);
3542 assert_eq!(P2PMCPMessageType::Response, P2PMCPMessageType::Response);
3543 assert_eq!(P2PMCPMessageType::ServiceAdvertisement, P2PMCPMessageType::ServiceAdvertisement);
3544 assert_eq!(P2PMCPMessageType::ServiceDiscovery, P2PMCPMessageType::ServiceDiscovery);
3545
3546 for msg_type in [
3548 P2PMCPMessageType::Request,
3549 P2PMCPMessageType::Response,
3550 P2PMCPMessageType::ServiceAdvertisement,
3551 P2PMCPMessageType::ServiceDiscovery,
3552 ] {
3553 let serialized = serde_json::to_string(&msg_type).unwrap();
3554 let deserialized: P2PMCPMessageType = serde_json::from_str(&serialized).unwrap();
3555 assert_eq!(msg_type, deserialized);
3556 }
3557 }
3558}