1#![allow(missing_docs)]
26
27pub mod security;
28
29use crate::dht::{DHT, Key};
30use crate::{P2PError, PeerId, Result};
31use rand;
32use serde::{Deserialize, Serialize};
33use serde_json::{Value, json};
34use std::collections::HashMap;
35use std::sync::Arc;
36use std::time::{Duration, Instant, SystemTime};
37use tokio::sync::{RwLock, mpsc, oneshot};
38use tokio::time::timeout;
39use tracing::{debug, info, warn};
40
41pub use security::*;
42
43pub const MCP_VERSION: &str = "2024-11-05";
45
46pub const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
48
49pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30);
51
52pub const MCP_PROTOCOL: &str = "/p2p-foundation/mcp/1.0.0";
54
55#[async_trait::async_trait]
57pub trait NetworkSender: Send + Sync {
58 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
60
61 fn local_peer_id(&self) -> &PeerId;
63}
64
65pub type MessageSender = Arc<dyn Fn(&PeerId, &str, Vec<u8>) -> Result<()> + Send + Sync>;
67
68pub const SERVICE_DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(tag = "type", rename_all = "snake_case")]
74pub enum MCPMessage {
75 Initialize {
77 protocol_version: String,
79 capabilities: MCPCapabilities,
81 client_info: MCPClientInfo,
83 },
84 InitializeResult {
86 protocol_version: String,
88 capabilities: MCPCapabilities,
90 server_info: MCPServerInfo,
92 },
93 ListTools {
95 cursor: Option<String>,
97 },
98 ListToolsResult {
100 tools: Vec<MCPTool>,
102 next_cursor: Option<String>,
104 },
105 CallTool {
107 name: String,
109 arguments: Value,
111 },
112 CallToolResult {
114 content: Vec<MCPContent>,
116 is_error: bool,
118 },
119 ListPrompts {
121 cursor: Option<String>,
123 },
124 ListPromptsResult {
126 prompts: Vec<MCPPrompt>,
128 next_cursor: Option<String>,
130 },
131 GetPrompt {
133 name: String,
135 arguments: Option<Value>,
137 },
138 GetPromptResult {
140 description: Option<String>,
142 messages: Vec<MCPPromptMessage>,
144 },
145 ListResources {
147 cursor: Option<String>,
149 },
150 ListResourcesResult {
152 resources: Vec<MCPResource>,
154 next_cursor: Option<String>,
156 },
157 ReadResource {
159 uri: String,
161 },
162 ReadResourceResult {
164 contents: Vec<MCPResourceContent>,
166 },
167 SubscribeResource {
169 uri: String,
171 },
172 UnsubscribeResource {
174 uri: String,
176 },
177 ResourceUpdated {
179 uri: String,
181 },
182 ListLogs {
184 cursor: Option<String>,
186 },
187 ListLogsResult {
189 logs: Vec<MCPLogEntry>,
191 next_cursor: Option<String>,
193 },
194 SetLogLevel {
196 level: MCPLogLevel,
198 },
199 Error {
201 code: i32,
203 message: String,
205 data: Option<Value>,
207 },
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct MCPCapabilities {
213 pub experimental: Option<Value>,
215 pub sampling: Option<Value>,
217 pub tools: Option<MCPToolsCapability>,
219 pub prompts: Option<MCPPromptsCapability>,
221 pub resources: Option<MCPResourcesCapability>,
223 pub logging: Option<MCPLoggingCapability>,
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct MCPToolsCapability {
230 pub list_changed: Option<bool>,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct MCPPromptsCapability {
237 pub list_changed: Option<bool>,
239}
240
241#[derive(Debug, Clone, Serialize, Deserialize)]
243pub struct MCPResourcesCapability {
244 pub subscribe: Option<bool>,
246 pub list_changed: Option<bool>,
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct MCPLoggingCapability {
253 pub levels: Option<Vec<MCPLogLevel>>,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct MCPClientInfo {
260 pub name: String,
262 pub version: String,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct MCPServerInfo {
269 pub name: String,
271 pub version: String,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct MCPTool {
278 pub name: String,
280 pub description: String,
282 pub input_schema: Value,
284}
285
286pub struct Tool {
288 pub definition: MCPTool,
290 pub handler: Box<dyn ToolHandler + Send + Sync>,
292 pub metadata: ToolMetadata,
294}
295
296#[derive(Debug, Clone)]
298pub struct ToolMetadata {
299 pub created_at: SystemTime,
301 pub last_called: Option<SystemTime>,
303 pub call_count: u64,
305 pub avg_execution_time: Duration,
307 pub health_status: ToolHealthStatus,
309 pub tags: Vec<String>,
311}
312
313#[derive(Debug, Clone, Copy, PartialEq)]
315pub enum ToolHealthStatus {
316 Healthy,
318 Degraded,
320 Unhealthy,
322 Disabled,
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct HealthMonitorConfig {
329 pub health_check_interval: Duration,
331 pub health_check_timeout: Duration,
333 pub failure_threshold: u32,
335 pub success_threshold: u32,
337 pub heartbeat_interval: Duration,
339 pub heartbeat_timeout: Duration,
341 pub enabled: bool,
343}
344
345impl Default for HealthMonitorConfig {
346 fn default() -> Self {
347 Self {
348 health_check_interval: Duration::from_secs(30),
349 health_check_timeout: Duration::from_secs(5),
350 failure_threshold: 3,
351 success_threshold: 2,
352 heartbeat_interval: Duration::from_secs(60),
353 heartbeat_timeout: Duration::from_secs(300), enabled: true,
355 }
356 }
357}
358
359#[derive(Debug, Clone, Serialize, Deserialize)]
361pub struct ServiceHealth {
362 pub service_id: String,
364 pub status: ServiceHealthStatus,
366 pub last_health_check: Option<SystemTime>,
368 pub last_heartbeat: Option<SystemTime>,
370 pub failure_count: u32,
372 pub success_count: u32,
374 pub avg_response_time: Duration,
376 pub error_message: Option<String>,
378 pub health_history: Vec<HealthCheckResult>,
380}
381
382#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct HealthCheckResult {
385 pub timestamp: SystemTime,
387 pub success: bool,
389 pub response_time: Duration,
391 pub error_message: Option<String>,
393}
394
395#[derive(Debug, Clone, Serialize, Deserialize)]
397pub struct Heartbeat {
398 pub service_id: String,
400 pub peer_id: PeerId,
402 pub timestamp: SystemTime,
404 pub load: f32,
406 pub available_tools: Vec<String>,
408 pub capabilities: MCPCapabilities,
410}
411
412#[derive(Debug, Clone)]
414pub enum HealthEvent {
415 ServiceHealthy { service_id: String, peer_id: PeerId },
417 ServiceUnhealthy {
419 service_id: String,
420 peer_id: PeerId,
421 error: String,
422 },
423 ServiceDegraded {
425 service_id: String,
426 peer_id: PeerId,
427 reason: String,
428 },
429 HeartbeatReceived {
431 service_id: String,
432 peer_id: PeerId,
433 load: f32,
434 },
435 HeartbeatTimeout { service_id: String, peer_id: PeerId },
437}
438
439pub trait ToolHandler {
441 fn execute(
443 &self,
444 arguments: Value,
445 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>>;
446
447 fn validate(&self, arguments: &Value) -> Result<()> {
449 let _ = arguments;
451 Ok(())
452 }
453
454 fn get_requirements(&self) -> ToolRequirements {
456 ToolRequirements::default()
457 }
458}
459
460#[derive(Debug, Clone)]
462pub struct ToolRequirements {
463 pub max_memory: Option<u64>,
465 pub max_execution_time: Option<Duration>,
467 pub required_capabilities: Vec<String>,
469 pub requires_network: bool,
471 pub requires_filesystem: bool,
473}
474
475impl Default for ToolRequirements {
476 fn default() -> Self {
477 Self {
478 max_memory: Some(100 * 1024 * 1024), max_execution_time: Some(Duration::from_secs(30)),
480 required_capabilities: Vec::new(),
481 requires_network: false,
482 requires_filesystem: false,
483 }
484 }
485}
486
487#[derive(Debug, Clone, Serialize, Deserialize)]
489#[serde(tag = "type", rename_all = "snake_case")]
490pub enum MCPContent {
491 Text {
493 text: String,
495 },
496 Image {
498 data: String,
500 mime_type: String,
502 },
503 Resource {
505 resource: MCPResourceReference,
507 },
508}
509
510#[derive(Debug, Clone, Serialize, Deserialize)]
512pub struct MCPResourceReference {
513 pub uri: String,
515 pub type_: Option<String>,
517}
518
519#[derive(Debug, Clone, Serialize, Deserialize)]
521pub struct MCPPrompt {
522 pub name: String,
524 pub description: Option<String>,
526 pub arguments: Option<Value>,
528}
529
530#[derive(Debug, Clone, Serialize, Deserialize)]
532pub struct MCPPromptMessage {
533 pub role: MCPRole,
535 pub content: MCPContent,
537}
538
539#[derive(Debug, Clone, Serialize, Deserialize)]
541#[serde(rename_all = "snake_case")]
542pub enum MCPRole {
543 User,
545 Assistant,
547 System,
549}
550
551#[derive(Debug, Clone, Serialize, Deserialize)]
553pub struct MCPResource {
554 pub uri: String,
556 pub name: String,
558 pub description: Option<String>,
560 pub mime_type: Option<String>,
562}
563
564#[derive(Debug, Clone, Serialize, Deserialize)]
566pub struct MCPResourceContent {
567 pub uri: String,
569 pub mime_type: String,
571 pub text: Option<String>,
573 pub blob: Option<String>,
575}
576
577#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
579#[serde(rename_all = "snake_case")]
580pub enum MCPLogLevel {
581 Debug,
583 Info,
585 Notice,
587 Warning,
589 Error,
591 Critical,
593 Alert,
595 Emergency,
597}
598
599#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct MCPLogEntry {
602 pub level: MCPLogLevel,
604 pub data: Value,
606 pub logger: Option<String>,
608}
609
610#[derive(Debug, Clone, Serialize, Deserialize)]
612pub struct MCPService {
613 pub service_id: String,
615 pub node_id: PeerId,
617 pub tools: Vec<String>,
619 pub capabilities: MCPCapabilities,
621 pub metadata: MCPServiceMetadata,
623 pub registered_at: SystemTime,
625 pub endpoint: MCPEndpoint,
627}
628
629#[derive(Debug, Clone, Serialize, Deserialize)]
631pub struct MCPServiceMetadata {
632 pub name: String,
634 pub version: String,
636 pub description: Option<String>,
638 pub tags: Vec<String>,
640 pub health_status: ServiceHealthStatus,
642 pub load_metrics: ServiceLoadMetrics,
644}
645
646#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
648pub enum ServiceHealthStatus {
649 Healthy,
651 Degraded,
653 Unhealthy,
655 Disabled,
657 Unknown,
659}
660
661#[derive(Debug, Clone, Serialize, Deserialize)]
663pub struct ServiceLoadMetrics {
664 pub active_requests: u32,
666 pub requests_per_second: f64,
668 pub avg_response_time_ms: f64,
670 pub error_rate: f64,
672 pub cpu_usage: f64,
674 pub memory_usage: u64,
676}
677
678#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct MCPEndpoint {
681 pub protocol: String,
683 pub address: String,
685 pub port: Option<u16>,
687 pub tls: bool,
689 pub auth_required: bool,
691}
692
693#[derive(Debug, Clone)]
695pub struct MCPRequest {
696 pub request_id: String,
698 pub source_peer: PeerId,
700 pub target_peer: PeerId,
702 pub message: MCPMessage,
704 pub timestamp: SystemTime,
706 pub timeout: Duration,
708 pub auth_token: Option<String>,
710}
711
712#[derive(Debug, Clone, Serialize, Deserialize)]
714pub struct P2PMCPMessage {
715 pub message_type: P2PMCPMessageType,
717 pub message_id: String,
719 pub source_peer: PeerId,
721 pub target_peer: Option<PeerId>,
723 pub timestamp: u64,
725 pub payload: MCPMessage,
727 pub ttl: u8,
729}
730
731#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
733pub enum P2PMCPMessageType {
734 Request,
736 Response,
738 ServiceAdvertisement,
740 ServiceDiscovery,
742 Heartbeat,
744 HealthCheck,
746}
747
748#[derive(Debug, Clone)]
750pub struct MCPResponse {
751 pub request_id: String,
753 pub message: MCPMessage,
755 pub timestamp: SystemTime,
757 pub processing_time: Duration,
759}
760
761#[derive(Debug, Clone)]
763pub struct MCPCallContext {
764 pub caller_id: PeerId,
766 pub timestamp: SystemTime,
768 pub timeout: Duration,
770 pub auth_info: Option<MCPAuthInfo>,
772 pub metadata: HashMap<String, String>,
774}
775
776#[derive(Debug, Clone)]
778pub struct MCPAuthInfo {
779 pub token: String,
781 pub token_type: String,
783 pub expires_at: Option<SystemTime>,
785 pub permissions: Vec<String>,
787}
788
789#[derive(Debug, Clone, Serialize, Deserialize)]
791pub struct MCPServerConfig {
792 pub server_name: String,
794 pub server_version: String,
796 pub enable_dht_discovery: bool,
798 pub max_concurrent_requests: usize,
800 pub request_timeout: Duration,
802 pub enable_auth: bool,
804 pub enable_rate_limiting: bool,
806 pub rate_limit_rpm: u32,
808 pub enable_logging: bool,
810 pub max_tool_execution_time: Duration,
812 pub tool_memory_limit: u64,
814 pub health_monitor: HealthMonitorConfig,
816}
817
818impl Default for MCPServerConfig {
819 fn default() -> Self {
820 Self {
821 server_name: "P2P-MCP-Server".to_string(),
822 server_version: crate::VERSION.to_string(),
823 enable_dht_discovery: true,
824 max_concurrent_requests: 100,
825 request_timeout: DEFAULT_CALL_TIMEOUT,
826 enable_auth: true,
827 enable_rate_limiting: true,
828 rate_limit_rpm: 60,
829 enable_logging: true,
830 max_tool_execution_time: Duration::from_secs(30),
831 tool_memory_limit: 100 * 1024 * 1024, health_monitor: HealthMonitorConfig::default(),
833 }
834 }
835}
836
837pub struct MCPServer {
839 config: MCPServerConfig,
841 tools: Arc<RwLock<HashMap<String, Tool>>>,
843 #[allow(dead_code)]
845 prompts: Arc<RwLock<HashMap<String, MCPPrompt>>>,
846 #[allow(dead_code)]
848 resources: Arc<RwLock<HashMap<String, MCPResource>>>,
849 sessions: Arc<RwLock<HashMap<String, MCPSession>>>,
851 request_handlers: Arc<RwLock<HashMap<String, oneshot::Sender<MCPResponse>>>>,
853 dht: Option<Arc<RwLock<DHT>>>,
855 local_services: Arc<RwLock<HashMap<String, MCPService>>>,
857 remote_services: Arc<RwLock<HashMap<String, MCPService>>>,
859 stats: Arc<RwLock<MCPServerStats>>,
861 request_tx: mpsc::UnboundedSender<MCPRequest>,
863 #[allow(dead_code)]
865 response_rx: Arc<RwLock<mpsc::UnboundedReceiver<MCPResponse>>>,
866 security_manager: Option<Arc<MCPSecurityManager>>,
868 audit_logger: Arc<SecurityAuditLogger>,
870 network_sender: Arc<RwLock<Option<Arc<dyn NetworkSender>>>>,
872 service_health: Arc<RwLock<HashMap<String, ServiceHealth>>>,
874 health_event_tx: mpsc::UnboundedSender<HealthEvent>,
876 #[allow(dead_code)]
878 health_event_rx: Arc<RwLock<mpsc::UnboundedReceiver<HealthEvent>>>,
879 node_id: Option<PeerId>,
881}
882
883#[derive(Debug, Clone)]
885pub struct MCPSession {
886 pub session_id: String,
888 pub peer_id: PeerId,
890 pub client_capabilities: Option<MCPCapabilities>,
892 pub started_at: SystemTime,
894 pub last_activity: SystemTime,
896 pub state: MCPSessionState,
898 pub subscribed_resources: Vec<String>,
900}
901
902#[derive(Debug, Clone, PartialEq)]
904pub enum MCPSessionState {
905 Initializing,
907 Active,
909 Inactive,
911 Terminated,
913}
914
915#[derive(Debug, Clone)]
917pub struct MCPServerStats {
918 pub total_requests: u64,
920 pub total_responses: u64,
922 pub total_errors: u64,
924 pub avg_response_time: Duration,
926 pub active_sessions: u32,
928 pub total_tools: u32,
930 pub popular_tools: HashMap<String, u64>,
932 pub server_started_at: SystemTime,
934}
935
936impl Default for MCPServerStats {
937 fn default() -> Self {
938 Self {
939 total_requests: 0,
940 total_responses: 0,
941 total_errors: 0,
942 avg_response_time: Duration::from_millis(0),
943 active_sessions: 0,
944 total_tools: 0,
945 popular_tools: HashMap::new(),
946 server_started_at: SystemTime::now(),
947 }
948 }
949}
950
951impl MCPServer {
952 pub fn new(config: MCPServerConfig) -> Self {
954 let (request_tx, _request_rx) = mpsc::unbounded_channel();
955 let (_response_tx, response_rx) = mpsc::unbounded_channel();
956 let (health_event_tx, health_event_rx) = mpsc::unbounded_channel();
957
958 let security_manager = if config.enable_auth {
960 let secret_key = (0..32).map(|_| rand::random::<u8>()).collect();
962 Some(Arc::new(MCPSecurityManager::new(
963 secret_key,
964 config.rate_limit_rpm,
965 )))
966 } else {
967 None
968 };
969
970 Self {
971 config,
972 tools: Arc::new(RwLock::new(HashMap::new())),
973 prompts: Arc::new(RwLock::new(HashMap::new())),
974 resources: Arc::new(RwLock::new(HashMap::new())),
975 sessions: Arc::new(RwLock::new(HashMap::new())),
976 request_handlers: Arc::new(RwLock::new(HashMap::new())),
977 dht: None,
978 local_services: Arc::new(RwLock::new(HashMap::new())),
979 remote_services: Arc::new(RwLock::new(HashMap::new())),
980 stats: Arc::new(RwLock::new(MCPServerStats::default())),
981 request_tx,
982 response_rx: Arc::new(RwLock::new(response_rx)),
983 security_manager,
984 audit_logger: Arc::new(SecurityAuditLogger::new(10000)), network_sender: Arc::new(RwLock::new(None)),
986 service_health: Arc::new(RwLock::new(HashMap::new())),
987 health_event_tx,
988 health_event_rx: Arc::new(RwLock::new(health_event_rx)),
989 node_id: None,
990 }
991 }
992
993 pub fn with_dht(mut self, dht: Arc<RwLock<DHT>>) -> Self {
995 self.dht = Some(dht);
996 self
997 }
998
999 pub async fn with_network_sender(mut self, sender: Arc<dyn NetworkSender>) -> Self {
1001 let peer_id = sender.local_peer_id().clone();
1002 self.node_id = Some(peer_id);
1003 *self.network_sender.write().await = Some(sender);
1004 self
1005 }
1006
1007 pub async fn set_network_sender(&self, sender: Arc<dyn NetworkSender>) {
1009 let peer_id = sender.local_peer_id().clone();
1010 *self.network_sender.write().await = Some(sender);
1011 info!("MCP server network sender configured for peer {}", peer_id);
1012 }
1013
1014 fn get_node_id_string(&self) -> String {
1016 self.node_id
1017 .as_ref()
1018 .map(|id| id.to_string())
1019 .unwrap_or_else(|| "uninitialized".to_string())
1020 }
1021
1022 pub async fn start(&self) -> Result<()> {
1024 info!("Starting MCP server: {}", self.config.server_name);
1025
1026 self.start_request_processor().await?;
1028
1029 if self.dht.is_some() {
1031 self.start_service_discovery().await?;
1032 }
1033
1034 self.start_health_monitor().await?;
1036
1037 info!("MCP server started successfully");
1038 Ok(())
1039 }
1040
1041 pub async fn register_tool(&self, tool: Tool) -> Result<()> {
1043 let tool_name = tool.definition.name.clone();
1044
1045 self.validate_tool(&tool).await?;
1047
1048 {
1050 let mut tools = self.tools.write().await;
1051 tools.insert(tool_name.clone(), tool);
1052 }
1053
1054 {
1056 let mut stats = self.stats.write().await;
1057 stats.total_tools += 1;
1058 }
1059
1060 if let Some(dht) = &self.dht {
1062 self.register_tool_in_dht(&tool_name, dht).await?;
1063 }
1064
1065 if let Err(e) = self.announce_local_services().await {
1067 warn!("Failed to announce service after tool registration: {}", e);
1068 }
1069
1070 info!("Registered tool: {}", tool_name);
1071 Ok(())
1072 }
1073
1074 async fn validate_tool(&self, tool: &Tool) -> Result<()> {
1076 let tools = self.tools.read().await;
1078 if tools.contains_key(&tool.definition.name) {
1079 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1080 format!("Tool already exists: {}", tool.definition.name).into(),
1081 )));
1082 }
1083
1084 if tool.definition.name.is_empty() || tool.definition.name.len() > 100 {
1086 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1087 "Invalid tool name".to_string().into(),
1088 )));
1089 }
1090
1091 if !tool.definition.input_schema.is_object() {
1093 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1094 "Tool input schema must be an object".to_string().into(),
1095 )));
1096 }
1097
1098 Ok(())
1099 }
1100
1101 async fn register_tool_in_dht(&self, tool_name: &str, dht: &Arc<RwLock<DHT>>) -> Result<()> {
1103 let key = Key::new(format!("mcp:tool:{tool_name}").as_bytes());
1104 let service_info = json!({
1105 "tool_name": tool_name,
1106 "node_id": self.get_node_id_string(),
1107 "registered_at": SystemTime::now().duration_since(std::time::UNIX_EPOCH).map_err(|e| P2PError::Identity(crate::error::IdentityError::SystemTime(format!("Time error: {e}").into())))?.as_secs(),
1108 "capabilities": self.get_server_capabilities().await
1109 });
1110
1111 let dht_guard = dht.read().await;
1112 dht_guard
1113 .put(key, serde_json::to_vec(&service_info)?)
1114 .await?;
1115
1116 Ok(())
1117 }
1118
1119 async fn get_server_capabilities(&self) -> MCPCapabilities {
1121 MCPCapabilities {
1122 experimental: None,
1123 sampling: None,
1124 tools: Some(MCPToolsCapability {
1125 list_changed: Some(true),
1126 }),
1127 prompts: Some(MCPPromptsCapability {
1128 list_changed: Some(true),
1129 }),
1130 resources: Some(MCPResourcesCapability {
1131 subscribe: Some(true),
1132 list_changed: Some(true),
1133 }),
1134 logging: Some(MCPLoggingCapability {
1135 levels: Some(vec![
1136 MCPLogLevel::Debug,
1137 MCPLogLevel::Info,
1138 MCPLogLevel::Warning,
1139 MCPLogLevel::Error,
1140 ]),
1141 }),
1142 }
1143 }
1144
1145 pub async fn call_tool(
1147 &self,
1148 tool_name: &str,
1149 arguments: Value,
1150 context: MCPCallContext,
1151 ) -> Result<Value> {
1152 let start_time = Instant::now();
1153
1154 if !self.check_rate_limit(&context.caller_id).await? {
1158 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1159 "Rate limit exceeded".to_string().into(),
1160 )));
1161 }
1162
1163 if !self
1165 .check_permission(&context.caller_id, &MCPPermission::ExecuteTools)
1166 .await?
1167 {
1168 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1169 "Permission denied: execute tools".to_string().into(),
1170 )));
1171 }
1172
1173 let tool_security_level = self.get_tool_security_policy(tool_name).await;
1175 let is_trusted = self.is_trusted_peer(&context.caller_id).await;
1176
1177 match tool_security_level {
1178 SecurityLevel::Admin => {
1179 if !self
1180 .check_permission(&context.caller_id, &MCPPermission::Admin)
1181 .await?
1182 {
1183 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1184 "Permission denied: admin access required"
1185 .to_string()
1186 .into(),
1187 )));
1188 }
1189 }
1190 SecurityLevel::Strong => {
1191 if !is_trusted {
1192 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1193 "Permission denied: trusted peer required"
1194 .to_string()
1195 .into(),
1196 )));
1197 }
1198 }
1199 SecurityLevel::Basic => {
1200 if self.config.enable_auth {
1202 if let Some(auth_info) = &context.auth_info {
1203 self.verify_auth_token(&auth_info.token).await?;
1204 } else {
1205 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1206 "Authentication required".to_string().into(),
1207 )));
1208 }
1209 }
1210 }
1211 SecurityLevel::Public => {
1212 }
1214 }
1215
1216 let mut details = HashMap::new();
1218 details.insert("action".to_string(), "tool_call".to_string());
1219 details.insert("tool_name".to_string(), tool_name.to_string());
1220 details.insert(
1221 "security_level".to_string(),
1222 format!("{tool_security_level:?}"),
1223 );
1224
1225 self.audit_logger
1226 .log_event(
1227 "tool_execution".to_string(),
1228 context.caller_id.clone(),
1229 details,
1230 AuditSeverity::Info,
1231 )
1232 .await;
1233
1234 let tool_exists = {
1236 let tools = self.tools.read().await;
1237 tools.contains_key(tool_name)
1238 };
1239
1240 if !tool_exists {
1241 return Err(P2PError::Mcp(crate::error::McpError::ToolNotFound(
1242 tool_name.to_string().into(),
1243 )));
1244 }
1245
1246 let requirements = {
1248 let tools = self.tools.read().await;
1249 let tool = tools.get(tool_name).ok_or_else(|| {
1250 P2PError::Mcp(crate::error::McpError::ToolNotFound(
1251 tool_name.to_string().into(),
1252 ))
1253 })?;
1254
1255 if let Err(e) = tool.handler.validate(&arguments) {
1257 return Err(P2PError::Mcp(crate::error::McpError::ExecutionFailed(
1258 format!("{}: Validation failed: {e}", tool_name).into(),
1259 )));
1260 }
1261
1262 tool.handler.get_requirements()
1264 };
1265
1266 self.check_resource_requirements(&requirements).await?;
1268
1269 let tools_clone = self.tools.clone();
1271 let tool_name_owned = tool_name.to_string();
1272 let execution_timeout = context
1273 .timeout
1274 .min(requirements.max_execution_time.unwrap_or(context.timeout));
1275
1276 let result = timeout(execution_timeout, async move {
1277 let tools = tools_clone.read().await;
1278 let tool = tools.get(&tool_name_owned).ok_or_else(|| {
1279 P2PError::Mcp(crate::error::McpError::ToolNotFound(
1280 tool_name_owned.clone().into(),
1281 ))
1282 })?;
1283 tool.handler.execute(arguments).await
1284 })
1285 .await
1286 .map_err(|_| {
1287 P2PError::Mcp(crate::error::McpError::ExecutionFailed(
1288 format!("{}: Tool execution timeout", tool_name).into(),
1289 ))
1290 })?
1291 .map_err(|e| {
1292 P2PError::Mcp(crate::error::McpError::ExecutionFailed(
1293 format!("{}: {e}", tool_name).into(),
1294 ))
1295 })?;
1296
1297 let execution_time = start_time.elapsed();
1298
1299 self.update_tool_stats(tool_name, execution_time, true)
1301 .await;
1302
1303 {
1305 let mut stats = self.stats.write().await;
1306 stats.total_requests += 1;
1307 stats.total_responses += 1;
1308
1309 let new_total_time = stats
1311 .avg_response_time
1312 .mul_f64(stats.total_responses as f64 - 1.0)
1313 + execution_time;
1314 stats.avg_response_time = new_total_time.div_f64(stats.total_responses as f64);
1315
1316 *stats
1318 .popular_tools
1319 .entry(tool_name.to_string())
1320 .or_insert(0) += 1;
1321 }
1322
1323 debug!("Tool '{}' executed in {:?}", tool_name, execution_time);
1324 Ok(result)
1325 }
1326
1327 async fn check_resource_requirements(&self, requirements: &ToolRequirements) -> Result<()> {
1329 if let Some(max_memory) = requirements.max_memory
1331 && max_memory > self.config.tool_memory_limit
1332 {
1333 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1334 "Tool memory requirement exceeds limit".to_string().into(),
1335 )));
1336 }
1337
1338 if let Some(max_execution_time) = requirements.max_execution_time
1340 && max_execution_time > self.config.max_tool_execution_time
1341 {
1342 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1343 "Tool execution time requirement exceeds limit"
1344 .to_string()
1345 .into(),
1346 )));
1347 }
1348
1349 Ok(())
1352 }
1353
1354 async fn update_tool_stats(&self, tool_name: &str, execution_time: Duration, success: bool) {
1356 let mut tools = self.tools.write().await;
1357 if let Some(tool) = tools.get_mut(tool_name) {
1358 tool.metadata.call_count += 1;
1359 tool.metadata.last_called = Some(SystemTime::now());
1360
1361 let new_total_time = tool
1363 .metadata
1364 .avg_execution_time
1365 .mul_f64(tool.metadata.call_count as f64 - 1.0)
1366 + execution_time;
1367 tool.metadata.avg_execution_time =
1368 new_total_time.div_f64(tool.metadata.call_count as f64);
1369
1370 if !success {
1372 tool.metadata.health_status = match tool.metadata.health_status {
1373 ToolHealthStatus::Healthy => ToolHealthStatus::Degraded,
1374 ToolHealthStatus::Degraded => ToolHealthStatus::Unhealthy,
1375 other => other,
1376 };
1377 } else if tool.metadata.health_status != ToolHealthStatus::Disabled {
1378 tool.metadata.health_status = ToolHealthStatus::Healthy;
1379 }
1380 }
1381 }
1382
1383 pub async fn list_tools(
1385 &self,
1386 _cursor: Option<String>,
1387 ) -> Result<(Vec<MCPTool>, Option<String>)> {
1388 let tools = self.tools.read().await;
1389 let tool_definitions: Vec<MCPTool> =
1390 tools.values().map(|tool| tool.definition.clone()).collect();
1391
1392 Ok((tool_definitions, None))
1395 }
1396
1397 async fn start_request_processor(&self) -> Result<()> {
1399 let _request_tx = self.request_tx.clone();
1400 let _server_clone = Arc::new(self);
1401
1402 tokio::spawn(async move {
1403 info!("MCP request processor started");
1404
1405 tokio::time::sleep(Duration::from_millis(100)).await;
1411
1412 info!("MCP request processor stopped");
1413 });
1414
1415 Ok(())
1416 }
1417
1418 async fn start_service_discovery(&self) -> Result<()> {
1420 if let Some(dht) = self.dht.clone() {
1421 let _stats = self.stats.clone();
1422 let remote_services = self.remote_services.clone();
1423
1424 tokio::spawn(async move {
1425 info!("MCP service discovery started");
1426
1427 loop {
1428 tokio::time::sleep(SERVICE_DISCOVERY_INTERVAL).await;
1430
1431 let key = Key::new(b"mcp:services");
1433 let dht_guard = dht.read().await;
1434
1435 match dht_guard.get(&key).await {
1436 Some(record) => {
1437 match serde_json::from_slice::<Vec<MCPService>>(&record.value) {
1438 Ok(services) => {
1439 debug!("Discovered {} MCP services", services.len());
1440
1441 {
1443 let mut remote_cache = remote_services.write().await;
1444 for service in services {
1445 remote_cache
1446 .insert(service.service_id.clone(), service);
1447 }
1448 }
1449 }
1450 Err(e) => {
1451 debug!("Failed to deserialize services: {}", e);
1452 }
1453 }
1454 }
1455 None => {
1456 debug!("No MCP services found in DHT");
1457 }
1458 }
1459 }
1460 });
1461 }
1462
1463 Ok(())
1464 }
1465
1466 async fn start_health_monitor(&self) -> Result<()> {
1468 if !self.config.health_monitor.enabled {
1469 debug!("Health monitoring is disabled");
1470 return Ok(());
1471 }
1472
1473 info!(
1474 "Starting health monitoring with interval: {:?}",
1475 self.config.health_monitor.health_check_interval
1476 );
1477
1478 let service_health = Arc::clone(&self.service_health);
1480 let remote_services = Arc::clone(&self.remote_services);
1481 let network_sender = Arc::clone(&self.network_sender);
1482 let health_event_tx = self.health_event_tx.clone();
1483 let config = self.config.health_monitor.clone();
1484
1485 let health_check_task = {
1487 let service_health = Arc::clone(&service_health);
1488 let remote_services = Arc::clone(&remote_services);
1489 let network_sender = Arc::clone(&network_sender);
1490 let health_event_tx = health_event_tx.clone();
1491 let config = config.clone();
1492
1493 tokio::spawn(async move {
1494 let mut interval = tokio::time::interval(config.health_check_interval);
1495
1496 loop {
1497 interval.tick().await;
1498
1499 let services_to_check: Vec<MCPService> = {
1501 let remote_guard = remote_services.read().await;
1502 remote_guard.values().cloned().collect()
1503 };
1504
1505 for service in services_to_check {
1507 if let Some(sender) = network_sender.read().await.as_ref() {
1508 Self::perform_health_check(
1509 &service,
1510 sender.as_ref(),
1511 &service_health,
1512 &health_event_tx,
1513 &config,
1514 )
1515 .await;
1516 }
1517 }
1518 }
1519 })
1520 };
1521
1522 let heartbeat_task = {
1524 let network_sender = Arc::clone(&network_sender);
1525 let health_event_tx = health_event_tx.clone();
1526 let config = config.clone();
1527
1528 tokio::spawn(async move {
1529 let mut interval = tokio::time::interval(config.heartbeat_interval);
1530
1531 loop {
1532 interval.tick().await;
1533
1534 if let Some(sender) = network_sender.read().await.as_ref() {
1535 Self::send_heartbeat(sender.as_ref(), &health_event_tx).await;
1536 }
1537 }
1538 })
1539 };
1540
1541 let timeout_task = {
1543 let service_health = Arc::clone(&service_health);
1544 let health_event_tx = health_event_tx.clone();
1545 let config = config.clone();
1546
1547 tokio::spawn(async move {
1548 let mut interval = tokio::time::interval(Duration::from_secs(30)); loop {
1551 interval.tick().await;
1552
1553 Self::check_heartbeat_timeouts(&service_health, &health_event_tx, &config)
1554 .await;
1555 }
1556 })
1557 };
1558
1559 tokio::spawn(async move {
1561 tokio::select! {
1562 _ = health_check_task => debug!("Health check task completed"),
1563 _ = heartbeat_task => debug!("Heartbeat task completed"),
1564 _ = timeout_task => debug!("Timeout monitoring task completed"),
1565 }
1566 });
1567
1568 info!("Health monitoring started successfully");
1569 Ok(())
1570 }
1571
1572 async fn perform_health_check(
1574 service: &MCPService,
1575 network_sender: &dyn NetworkSender,
1576 service_health: &Arc<RwLock<HashMap<String, ServiceHealth>>>,
1577 health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1578 config: &HealthMonitorConfig,
1579 ) {
1580 let start_time = Instant::now();
1581 let service_id = service.service_id.clone();
1582 let peer_id = service.node_id.clone();
1583
1584 let health_check_message = MCPMessage::CallTool {
1587 name: "health_check".to_string(),
1588 arguments: json!({
1589 "service_id": service_id,
1590 "timestamp": SystemTime::now()
1591 .duration_since(SystemTime::UNIX_EPOCH)
1592 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
1593 .as_secs()
1594 }),
1595 };
1596
1597 let result = match serde_json::to_vec(&health_check_message) {
1599 Ok(data) => {
1600 timeout(
1601 config.health_check_timeout,
1602 network_sender.send_message(&peer_id, MCP_PROTOCOL, data),
1603 )
1604 .await
1605 }
1606 Err(e) => {
1607 debug!("Failed to serialize health check message: {}", e);
1608 return;
1609 }
1610 };
1611
1612 let response_time = start_time.elapsed();
1613 let success = result.as_ref().map(|r| r.is_ok()).unwrap_or(false);
1614
1615 let mut health_guard = service_health.write().await;
1617 let health = health_guard
1618 .entry(service_id.clone())
1619 .or_insert_with(|| ServiceHealth {
1620 service_id: service_id.clone(),
1621 status: ServiceHealthStatus::Unknown,
1622 last_health_check: None,
1623 last_heartbeat: None,
1624 failure_count: 0,
1625 success_count: 0,
1626 avg_response_time: Duration::from_millis(0),
1627 error_message: None,
1628 health_history: Vec::new(),
1629 });
1630
1631 let check_result = HealthCheckResult {
1633 timestamp: SystemTime::now(),
1634 success,
1635 response_time,
1636 error_message: if success {
1637 None
1638 } else {
1639 Some("Health check failed".to_string())
1640 },
1641 };
1642
1643 health.health_history.push(check_result);
1644 if health.health_history.len() > 10 {
1645 health.health_history.remove(0);
1646 }
1647
1648 let previous_status = health.status;
1650 if success {
1651 health.failure_count = 0;
1652 health.success_count += 1;
1653 health.last_health_check = Some(SystemTime::now());
1654
1655 if health.success_count >= config.success_threshold {
1656 health.status = ServiceHealthStatus::Healthy;
1657 health.error_message = None;
1658 }
1659 } else {
1660 health.success_count = 0;
1661 health.failure_count += 1;
1662
1663 if health.failure_count >= config.failure_threshold {
1664 health.status = ServiceHealthStatus::Unhealthy;
1665 health.error_message = Some("Health check failures exceeded threshold".to_string());
1666 }
1667 }
1668
1669 let total_time: Duration = health.health_history.iter().map(|h| h.response_time).sum();
1671 health.avg_response_time = total_time / health.health_history.len() as u32;
1672
1673 if previous_status != health.status {
1675 let event = match health.status {
1676 ServiceHealthStatus::Healthy => HealthEvent::ServiceHealthy {
1677 service_id: service_id.clone(),
1678 peer_id: peer_id.clone(),
1679 },
1680 ServiceHealthStatus::Unhealthy => HealthEvent::ServiceUnhealthy {
1681 service_id: service_id.clone(),
1682 peer_id: peer_id.clone(),
1683 error: health
1684 .error_message
1685 .clone()
1686 .unwrap_or_else(|| "Unknown error".to_string()),
1687 },
1688 ServiceHealthStatus::Degraded => HealthEvent::ServiceDegraded {
1689 service_id: service_id.clone(),
1690 peer_id: peer_id.clone(),
1691 reason: "Performance degradation detected".to_string(),
1692 },
1693 _ => return, };
1695
1696 if let Err(e) = health_event_tx.send(event) {
1697 debug!("Failed to send health event: {}", e);
1698 }
1699 }
1700 }
1701
1702 async fn send_heartbeat(
1704 network_sender: &dyn NetworkSender,
1705 health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1706 ) {
1707 let load = {
1710 0.2 };
1714
1715 let available_tools = vec!["query".to_string(), "update".to_string()]; let heartbeat = Heartbeat {
1720 service_id: "mcp-server".to_string(),
1721 peer_id: network_sender.local_peer_id().clone(),
1722 timestamp: SystemTime::now(),
1723 load,
1724 available_tools,
1725 capabilities: MCPCapabilities {
1726 experimental: None,
1727 sampling: None,
1728 tools: Some(MCPToolsCapability {
1729 list_changed: Some(true),
1730 }),
1731 prompts: None,
1732 resources: None,
1733 logging: None,
1734 },
1735 };
1736
1737 let heartbeat_message = MCPMessage::CallTool {
1739 name: "heartbeat".to_string(),
1740 arguments: serde_json::to_value(&heartbeat).unwrap_or(json!({})),
1741 };
1742
1743 if let Ok(_data) = serde_json::to_vec(&heartbeat_message) {
1744 debug!("Sending heartbeat for service: {}", heartbeat.service_id);
1747
1748 let event = HealthEvent::HeartbeatReceived {
1750 service_id: heartbeat.service_id.clone(),
1751 peer_id: heartbeat.peer_id.clone(),
1752 load: heartbeat.load,
1753 };
1754
1755 if let Err(e) = health_event_tx.send(event) {
1756 debug!("Failed to send heartbeat event: {}", e);
1757 }
1758 }
1759 }
1760
1761 async fn check_heartbeat_timeouts(
1763 service_health: &Arc<RwLock<HashMap<String, ServiceHealth>>>,
1764 health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1765 config: &HealthMonitorConfig,
1766 ) {
1767 let now = SystemTime::now();
1768 let mut health_guard = service_health.write().await;
1769
1770 for (service_id, health) in health_guard.iter_mut() {
1771 if let Some(last_heartbeat) = health.last_heartbeat
1772 && let Ok(duration) = now.duration_since(last_heartbeat)
1773 && duration > config.heartbeat_timeout
1774 {
1775 let previous_status = health.status;
1776 health.status = ServiceHealthStatus::Unhealthy;
1777 health.error_message = Some("Heartbeat timeout".to_string());
1778
1779 if previous_status != ServiceHealthStatus::Unhealthy {
1781 let event = HealthEvent::HeartbeatTimeout {
1782 service_id: service_id.clone(),
1783 peer_id: PeerId::from("unknown".to_string()), };
1785
1786 if let Err(e) = health_event_tx.send(event) {
1787 debug!("Failed to send timeout event: {}", e);
1788 }
1789 }
1790 }
1791 }
1792 }
1793
1794 pub async fn get_stats(&self) -> MCPServerStats {
1796 self.stats.read().await.clone()
1797 }
1798
1799 pub async fn handle_heartbeat(&self, heartbeat: Heartbeat) -> Result<()> {
1801 let service_id = heartbeat.service_id.clone();
1802 let peer_id = heartbeat.peer_id.clone();
1803
1804 {
1806 let mut health_guard = self.service_health.write().await;
1807 let health = health_guard
1808 .entry(service_id.clone())
1809 .or_insert_with(|| ServiceHealth {
1810 service_id: service_id.clone(),
1811 status: ServiceHealthStatus::Healthy,
1812 last_health_check: None,
1813 last_heartbeat: None,
1814 failure_count: 0,
1815 success_count: 0,
1816 avg_response_time: Duration::from_millis(0),
1817 error_message: None,
1818 health_history: Vec::new(),
1819 });
1820
1821 health.last_heartbeat = Some(heartbeat.timestamp);
1822 health.status = ServiceHealthStatus::Healthy;
1823 health.failure_count = 0;
1824 health.error_message = None;
1825 }
1826
1827 let event = HealthEvent::HeartbeatReceived {
1829 service_id,
1830 peer_id,
1831 load: heartbeat.load,
1832 };
1833
1834 if let Err(e) = self.health_event_tx.send(event) {
1835 debug!("Failed to send heartbeat received event: {}", e);
1836 }
1837
1838 info!(
1839 "Heartbeat received from service: {} (load: {:.2})",
1840 heartbeat.service_id, heartbeat.load
1841 );
1842 Ok(())
1843 }
1844
1845 pub async fn get_service_health(&self, service_id: &str) -> Option<ServiceHealth> {
1847 let health_guard = self.service_health.read().await;
1848 health_guard.get(service_id).cloned()
1849 }
1850
1851 pub async fn get_all_service_health(&self) -> HashMap<String, ServiceHealth> {
1853 self.service_health.read().await.clone()
1854 }
1855
1856 pub async fn get_healthy_services(&self) -> Vec<String> {
1858 let health_guard = self.service_health.read().await;
1859 health_guard
1860 .iter()
1861 .filter(|(_, health)| health.status == ServiceHealthStatus::Healthy)
1862 .map(|(service_id, _)| service_id.clone())
1863 .collect()
1864 }
1865
1866 pub async fn update_service_health(
1868 &self,
1869 service_id: String,
1870 status: ServiceHealthStatus,
1871 error_message: Option<String>,
1872 ) {
1873 let mut health_guard = self.service_health.write().await;
1874 if let Some(health) = health_guard.get_mut(&service_id) {
1875 let previous_status = health.status;
1876 health.status = status;
1877 health.error_message = error_message.clone();
1878
1879 if previous_status != status {
1881 let event = match status {
1882 ServiceHealthStatus::Healthy => HealthEvent::ServiceHealthy {
1883 service_id: service_id.clone(),
1884 peer_id: PeerId::from("manual".to_string()),
1885 },
1886 ServiceHealthStatus::Unhealthy => HealthEvent::ServiceUnhealthy {
1887 service_id: service_id.clone(),
1888 peer_id: PeerId::from("manual".to_string()),
1889 error: error_message
1890 .unwrap_or_else(|| "Manually set to unhealthy".to_string()),
1891 },
1892 ServiceHealthStatus::Degraded => HealthEvent::ServiceDegraded {
1893 service_id: service_id.clone(),
1894 peer_id: PeerId::from("manual".to_string()),
1895 reason: "Manually set to degraded".to_string(),
1896 },
1897 _ => return,
1898 };
1899
1900 if let Err(e) = self.health_event_tx.send(event) {
1901 debug!("Failed to send manual health update event: {}", e);
1902 }
1903 }
1904 }
1905 }
1906
1907 pub fn subscribe_health_events(&self) -> mpsc::UnboundedReceiver<HealthEvent> {
1909 let (_tx, rx) = mpsc::unbounded_channel();
1912 rx
1913 }
1914
1915 pub async fn is_service_healthy(&self, service_id: &str) -> bool {
1917 if let Some(health) = self.get_service_health(service_id).await {
1918 health.status == ServiceHealthStatus::Healthy
1919 } else {
1920 false
1921 }
1922 }
1923
1924 pub async fn get_service_load_info(&self) -> HashMap<String, f32> {
1926 let health_guard = self.service_health.read().await;
1929 health_guard
1930 .iter()
1931 .map(|(service_id, health)| {
1932 let load = match health.status {
1933 ServiceHealthStatus::Healthy => 0.1,
1934 ServiceHealthStatus::Degraded => 0.7,
1935 ServiceHealthStatus::Unhealthy => 1.0,
1936 ServiceHealthStatus::Disabled => 0.0,
1937 ServiceHealthStatus::Unknown => 0.5,
1938 };
1939 (service_id.clone(), load)
1940 })
1941 .collect()
1942 }
1943
1944 pub async fn call_remote_tool(
1946 &self,
1947 peer_id: &PeerId,
1948 tool_name: &str,
1949 arguments: Value,
1950 context: MCPCallContext,
1951 ) -> Result<Value> {
1952 let request_id = uuid::Uuid::new_v4().to_string();
1953
1954 let mcp_message = MCPMessage::CallTool {
1956 name: tool_name.to_string(),
1957 arguments,
1958 };
1959
1960 let p2p_message = P2PMCPMessage {
1962 message_type: P2PMCPMessageType::Request,
1963 message_id: request_id.clone(),
1964 source_peer: context.caller_id.clone(),
1965 target_peer: Some(peer_id.clone()),
1966 timestamp: SystemTime::now()
1967 .duration_since(std::time::UNIX_EPOCH)
1968 .map_err(|e| {
1969 P2PError::Identity(crate::error::IdentityError::SystemTime(
1970 format!("Time error: {e}").into(),
1971 ))
1972 })?
1973 .as_secs(),
1974 payload: mcp_message,
1975 ttl: 5, };
1977
1978 let message_data = serde_json::to_vec(&p2p_message)
1980 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1981
1982 if message_data.len() > MAX_MESSAGE_SIZE {
1983 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1984 "Message too large".to_string().into(),
1985 )));
1986 }
1987
1988 let (response_tx, _response_rx) = oneshot::channel::<MCPResponse>();
1990
1991 {
1993 let mut handlers = self.request_handlers.write().await;
1994 handlers.insert(request_id.clone(), response_tx);
1995 }
1996
1997 if let Some(ref network_sender) = *self.network_sender.read().await {
1999 network_sender
2001 .send_message(peer_id, MCP_PROTOCOL, message_data)
2002 .await?;
2003
2004 debug!(
2007 "MCP remote tool call sent to peer {}, tool: {}",
2008 peer_id, tool_name
2009 );
2010
2011 Ok(json!({
2014 "status": "sent",
2015 "message": "Remote tool call sent successfully",
2016 "peer_id": peer_id,
2017 "tool_name": tool_name
2018 }))
2019 } else {
2020 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2021 "Network sender not configured".to_string().into(),
2022 )))
2023 }
2024 }
2025
2026 pub async fn handle_p2p_message(
2028 &self,
2029 message_data: &[u8],
2030 source_peer: &PeerId,
2031 ) -> Result<Option<Vec<u8>>> {
2032 let p2p_message: P2PMCPMessage = serde_json::from_slice(message_data)
2034 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2035
2036 debug!(
2037 "Received MCP message from {}: {:?}",
2038 source_peer, p2p_message.message_type
2039 );
2040
2041 if let MCPMessage::CallTool { name, arguments } = &p2p_message.payload {
2043 if name == "heartbeat" {
2044 if let Ok(heartbeat) = serde_json::from_value::<Heartbeat>(arguments.clone()) {
2045 self.handle_heartbeat(heartbeat).await?;
2046 return Ok(None);
2047 }
2048 } else if name == "health_check" {
2049 let health_response = MCPMessage::CallToolResult {
2051 content: vec![],
2052 is_error: false,
2053 };
2054
2055 let response_message = P2PMCPMessage {
2056 message_type: P2PMCPMessageType::Response,
2057 message_id: p2p_message.message_id.clone(),
2058 source_peer: source_peer.clone(),
2059 target_peer: Some(p2p_message.source_peer.clone()),
2060 timestamp: SystemTime::now()
2061 .duration_since(SystemTime::UNIX_EPOCH)
2062 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
2063 .as_secs(),
2064 payload: health_response,
2065 ttl: 3,
2066 };
2067
2068 return Ok(Some(serde_json::to_vec(&response_message)?));
2069 }
2070 }
2071
2072 match p2p_message.message_type {
2073 P2PMCPMessageType::Request => self.handle_remote_request(p2p_message).await,
2074 P2PMCPMessageType::Response => {
2075 self.handle_remote_response(p2p_message).await?;
2076 Ok(None) }
2078 P2PMCPMessageType::ServiceAdvertisement => {
2079 self.handle_service_advertisement(p2p_message).await?;
2080 Ok(None)
2081 }
2082 P2PMCPMessageType::ServiceDiscovery => self.handle_service_discovery(p2p_message).await,
2083 P2PMCPMessageType::Heartbeat => {
2084 debug!("Received heartbeat message");
2085 Ok(None)
2086 }
2087 P2PMCPMessageType::HealthCheck => {
2088 debug!("Received health check message");
2089 Ok(None)
2090 }
2091 }
2092 }
2093
2094 async fn handle_remote_request(&self, message: P2PMCPMessage) -> Result<Option<Vec<u8>>> {
2096 match message.payload {
2097 MCPMessage::CallTool { name, arguments } => {
2098 let context = MCPCallContext {
2099 caller_id: message.source_peer.clone(),
2100 timestamp: SystemTime::now(),
2101 timeout: DEFAULT_CALL_TIMEOUT,
2102 auth_info: None,
2103 metadata: HashMap::new(),
2104 };
2105
2106 let result = self.call_tool(&name, arguments, context).await;
2108
2109 let response_payload = match result {
2111 Ok(value) => MCPMessage::CallToolResult {
2112 content: vec![MCPContent::Text {
2113 text: value.to_string(),
2114 }],
2115 is_error: false,
2116 },
2117 Err(e) => MCPMessage::Error {
2118 code: -1,
2119 message: e.to_string(),
2120 data: None,
2121 },
2122 };
2123
2124 let response_message = P2PMCPMessage {
2125 message_type: P2PMCPMessageType::Response,
2126 message_id: message.message_id,
2127 source_peer: self.get_node_id_string(),
2128 target_peer: Some(message.source_peer),
2129 timestamp: SystemTime::now()
2130 .duration_since(std::time::UNIX_EPOCH)
2131 .map_err(|e| {
2132 P2PError::Identity(crate::error::IdentityError::SystemTime(
2133 format!("Time error: {e}").into(),
2134 ))
2135 })?
2136 .as_secs(),
2137 payload: response_payload,
2138 ttl: message.ttl.saturating_sub(1),
2139 };
2140
2141 let response_data = serde_json::to_vec(&response_message)
2143 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2144
2145 Ok(Some(response_data))
2146 }
2147 MCPMessage::ListTools { cursor: _ } => {
2148 let (tools, _) = self.list_tools(None).await?;
2149
2150 let response_payload = MCPMessage::ListToolsResult {
2151 tools,
2152 next_cursor: None,
2153 };
2154
2155 let response_message = P2PMCPMessage {
2156 message_type: P2PMCPMessageType::Response,
2157 message_id: message.message_id,
2158 source_peer: self.get_node_id_string(),
2159 target_peer: Some(message.source_peer),
2160 timestamp: SystemTime::now()
2161 .duration_since(std::time::UNIX_EPOCH)
2162 .map_err(|e| {
2163 P2PError::Identity(crate::error::IdentityError::SystemTime(
2164 format!("Time error: {e}").into(),
2165 ))
2166 })?
2167 .as_secs(),
2168 payload: response_payload,
2169 ttl: message.ttl.saturating_sub(1),
2170 };
2171
2172 let response_data = serde_json::to_vec(&response_message)
2173 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2174
2175 Ok(Some(response_data))
2176 }
2177 _ => {
2178 let error_response = P2PMCPMessage {
2180 message_type: P2PMCPMessageType::Response,
2181 message_id: message.message_id,
2182 source_peer: self.get_node_id_string(),
2183 target_peer: Some(message.source_peer),
2184 timestamp: SystemTime::now()
2185 .duration_since(std::time::UNIX_EPOCH)
2186 .map_err(|e| {
2187 P2PError::Identity(crate::error::IdentityError::SystemTime(
2188 format!("Time error: {e}").into(),
2189 ))
2190 })?
2191 .as_secs(),
2192 payload: MCPMessage::Error {
2193 code: -2,
2194 message: "Unsupported request type".to_string(),
2195 data: None,
2196 },
2197 ttl: message.ttl.saturating_sub(1),
2198 };
2199
2200 let response_data = serde_json::to_vec(&error_response)
2201 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2202
2203 Ok(Some(response_data))
2204 }
2205 }
2206 }
2207
2208 pub async fn generate_auth_token(
2212 &self,
2213 peer_id: &PeerId,
2214 permissions: Vec<MCPPermission>,
2215 ttl: Duration,
2216 ) -> Result<String> {
2217 if let Some(security_manager) = &self.security_manager {
2218 let token = security_manager
2219 .generate_token(peer_id, permissions, ttl)
2220 .await?;
2221
2222 let mut details = HashMap::new();
2224 details.insert("action".to_string(), "token_generated".to_string());
2225 details.insert("ttl_seconds".to_string(), ttl.as_secs().to_string());
2226
2227 self.audit_logger
2228 .log_event(
2229 "authentication".to_string(),
2230 peer_id.clone(),
2231 details,
2232 AuditSeverity::Info,
2233 )
2234 .await;
2235
2236 Ok(token)
2237 } else {
2238 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2239 "Authentication not enabled".to_string().into(),
2240 )))
2241 }
2242 }
2243
2244 pub async fn verify_auth_token(&self, token: &str) -> Result<TokenPayload> {
2246 if let Some(security_manager) = &self.security_manager {
2247 match security_manager.verify_token(token).await {
2248 Ok(payload) => {
2249 let mut details = HashMap::new();
2251 details.insert("action".to_string(), "token_verified".to_string());
2252 details.insert("subject".to_string(), payload.sub.clone());
2253
2254 self.audit_logger
2255 .log_event(
2256 "authentication".to_string(),
2257 payload.iss.clone(),
2258 details,
2259 AuditSeverity::Info,
2260 )
2261 .await;
2262
2263 Ok(payload)
2264 }
2265 Err(e) => {
2266 let mut details = HashMap::new();
2268 details.insert(
2269 "action".to_string(),
2270 "token_verification_failed".to_string(),
2271 );
2272 details.insert("error".to_string(), e.to_string());
2273
2274 self.audit_logger
2275 .log_event(
2276 "authentication".to_string(),
2277 "unknown".to_string(),
2278 details,
2279 AuditSeverity::Warning,
2280 )
2281 .await;
2282
2283 Err(e)
2284 }
2285 }
2286 } else {
2287 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2288 "Authentication not enabled".to_string().into(),
2289 )))
2290 }
2291 }
2292
2293 pub async fn check_permission(
2295 &self,
2296 peer_id: &PeerId,
2297 permission: &MCPPermission,
2298 ) -> Result<bool> {
2299 if let Some(security_manager) = &self.security_manager {
2300 security_manager.check_permission(peer_id, permission).await
2301 } else {
2302 Ok(true)
2304 }
2305 }
2306
2307 pub async fn check_rate_limit(&self, peer_id: &PeerId) -> Result<bool> {
2309 if let Some(security_manager) = &self.security_manager {
2310 let allowed = security_manager.check_rate_limit(peer_id).await?;
2311
2312 if !allowed {
2313 let mut details = HashMap::new();
2315 details.insert("action".to_string(), "rate_limit_exceeded".to_string());
2316
2317 self.audit_logger
2318 .log_event(
2319 "rate_limiting".to_string(),
2320 peer_id.clone(),
2321 details,
2322 AuditSeverity::Warning,
2323 )
2324 .await;
2325 }
2326
2327 Ok(allowed)
2328 } else {
2329 Ok(true)
2331 }
2332 }
2333
2334 pub async fn grant_permission(
2336 &self,
2337 peer_id: &PeerId,
2338 permission: MCPPermission,
2339 ) -> Result<()> {
2340 if let Some(security_manager) = &self.security_manager {
2341 security_manager
2342 .grant_permission(peer_id, permission.clone())
2343 .await?;
2344
2345 let mut details = HashMap::new();
2347 details.insert("action".to_string(), "permission_granted".to_string());
2348 details.insert("permission".to_string(), permission.as_str().to_string());
2349
2350 self.audit_logger
2351 .log_event(
2352 "authorization".to_string(),
2353 peer_id.clone(),
2354 details,
2355 AuditSeverity::Info,
2356 )
2357 .await;
2358
2359 Ok(())
2360 } else {
2361 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2362 "Security not enabled".to_string().into(),
2363 )))
2364 }
2365 }
2366
2367 pub async fn revoke_permission(
2369 &self,
2370 peer_id: &PeerId,
2371 permission: &MCPPermission,
2372 ) -> Result<()> {
2373 if let Some(security_manager) = &self.security_manager {
2374 security_manager
2375 .revoke_permission(peer_id, permission)
2376 .await?;
2377
2378 let mut details = HashMap::new();
2380 details.insert("action".to_string(), "permission_revoked".to_string());
2381 details.insert("permission".to_string(), permission.as_str().to_string());
2382
2383 self.audit_logger
2384 .log_event(
2385 "authorization".to_string(),
2386 peer_id.clone(),
2387 details,
2388 AuditSeverity::Info,
2389 )
2390 .await;
2391
2392 Ok(())
2393 } else {
2394 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2395 "Security not enabled".to_string().into(),
2396 )))
2397 }
2398 }
2399
2400 pub async fn add_trusted_peer(&self, peer_id: PeerId) -> Result<()> {
2402 if let Some(security_manager) = &self.security_manager {
2403 security_manager.add_trusted_peer(peer_id.clone()).await?;
2404
2405 let mut details = HashMap::new();
2407 details.insert("action".to_string(), "trusted_peer_added".to_string());
2408
2409 self.audit_logger
2410 .log_event(
2411 "trust_management".to_string(),
2412 peer_id,
2413 details,
2414 AuditSeverity::Info,
2415 )
2416 .await;
2417
2418 Ok(())
2419 } else {
2420 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2421 "Security not enabled".to_string().into(),
2422 )))
2423 }
2424 }
2425
2426 pub async fn is_trusted_peer(&self, peer_id: &PeerId) -> bool {
2428 if let Some(security_manager) = &self.security_manager {
2429 security_manager.is_trusted_peer(peer_id).await
2430 } else {
2431 false
2432 }
2433 }
2434
2435 pub async fn set_tool_security_policy(
2437 &self,
2438 tool_name: String,
2439 level: SecurityLevel,
2440 ) -> Result<()> {
2441 if let Some(security_manager) = &self.security_manager {
2442 security_manager
2443 .set_tool_policy(tool_name.clone(), level.clone())
2444 .await?;
2445
2446 let mut details = HashMap::new();
2448 details.insert("action".to_string(), "tool_policy_set".to_string());
2449 details.insert("tool_name".to_string(), tool_name);
2450 details.insert("security_level".to_string(), format!("{level:?}"));
2451
2452 self.audit_logger
2453 .log_event(
2454 "security_policy".to_string(),
2455 "system".to_string(),
2456 details,
2457 AuditSeverity::Info,
2458 )
2459 .await;
2460
2461 Ok(())
2462 } else {
2463 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2464 "Security not enabled".to_string().into(),
2465 )))
2466 }
2467 }
2468
2469 pub async fn get_tool_security_policy(&self, tool_name: &str) -> SecurityLevel {
2471 if let Some(security_manager) = &self.security_manager {
2472 security_manager.get_tool_policy(tool_name).await
2473 } else {
2474 SecurityLevel::Public
2475 }
2476 }
2477
2478 pub async fn get_peer_security_stats(&self, peer_id: &PeerId) -> Option<PeerACL> {
2480 if let Some(security_manager) = &self.security_manager {
2481 security_manager.get_peer_stats(peer_id).await
2482 } else {
2483 None
2484 }
2485 }
2486
2487 pub async fn get_security_audit(&self, limit: Option<usize>) -> Vec<SecurityAuditEntry> {
2489 self.audit_logger.get_recent_entries(limit).await
2490 }
2491
2492 pub async fn security_cleanup(&self) -> Result<()> {
2494 if let Some(security_manager) = &self.security_manager {
2495 security_manager.cleanup().await?;
2496 }
2497 Ok(())
2498 }
2499
2500 async fn handle_remote_response(&self, message: P2PMCPMessage) -> Result<()> {
2502 let response_tx = {
2504 let mut handlers = self.request_handlers.write().await;
2505 handlers.remove(&message.message_id)
2506 };
2507
2508 if let Some(tx) = response_tx {
2509 let response = MCPResponse {
2510 request_id: message.message_id,
2511 message: message.payload,
2512 timestamp: SystemTime::now(),
2513 processing_time: Duration::from_millis(0), };
2515
2516 let _ = tx.send(response);
2518 } else {
2519 debug!(
2520 "Received response for unknown request: {}",
2521 message.message_id
2522 );
2523 }
2524
2525 Ok(())
2526 }
2527
2528 pub async fn announce_local_services(&self) -> Result<()> {
2530 if let Some(dht) = &self.dht {
2531 let local_service = self.create_local_service_announcement().await?;
2533
2534 self.store_service_in_dht(&local_service, dht).await?;
2536
2537 if let Some(network_sender) = &*self.network_sender.read().await {
2539 self.broadcast_service_announcement(&local_service, network_sender)
2540 .await?;
2541 }
2542
2543 info!(
2544 "Announced local MCP service with {} tools",
2545 local_service.tools.len()
2546 );
2547 }
2548
2549 Ok(())
2550 }
2551
2552 async fn create_local_service_announcement(&self) -> Result<MCPService> {
2554 let tools = self.tools.read().await;
2555 let tool_names: Vec<String> = tools.keys().cloned().collect();
2556
2557 let service = MCPService {
2558 service_id: format!("mcp-{}", self.config.server_name),
2559 node_id: "local".to_string(), tools: tool_names,
2561 capabilities: MCPCapabilities {
2562 experimental: None,
2563 sampling: None,
2564 tools: Some(MCPToolsCapability {
2565 list_changed: Some(true),
2566 }),
2567 prompts: None,
2568 resources: None,
2569 logging: None,
2570 },
2571 metadata: MCPServiceMetadata {
2572 name: self.config.server_name.clone(),
2573 version: self.config.server_version.clone(),
2574 description: Some("P2P MCP Service".to_string()),
2575 tags: vec!["p2p".to_string(), "mcp".to_string()],
2576 health_status: ServiceHealthStatus::Healthy,
2577 load_metrics: self.get_current_load_metrics().await,
2578 },
2579 registered_at: SystemTime::now(),
2580 endpoint: MCPEndpoint {
2581 protocol: "p2p".to_string(),
2582 address: "local".to_string(), port: None,
2584 tls: true,
2585 auth_required: false,
2586 },
2587 };
2588
2589 Ok(service)
2590 }
2591
2592 async fn get_current_load_metrics(&self) -> ServiceLoadMetrics {
2594 let stats = self.stats.read().await;
2595
2596 ServiceLoadMetrics {
2597 active_requests: self.request_handlers.read().await.len() as u32,
2598 requests_per_second: stats.total_requests as f64 / 60.0, avg_response_time_ms: stats.avg_response_time.as_millis() as f64,
2600 error_rate: if stats.total_requests > 0 {
2601 stats.total_errors as f64 / stats.total_requests as f64
2602 } else {
2603 0.0
2604 },
2605 cpu_usage: 0.0, memory_usage: 0, }
2608 }
2609
2610 async fn store_service_in_dht(
2612 &self,
2613 service: &MCPService,
2614 dht: &Arc<RwLock<DHT>>,
2615 ) -> Result<()> {
2616 let service_key = Key::new(format!("mcp:service:{}", service.service_id).as_bytes());
2618 let service_data = serde_json::to_vec(service)
2619 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2620
2621 let dht_guard = dht.write().await;
2622 dht_guard
2623 .put(service_key.clone(), service_data)
2624 .await
2625 .map_err(|e| {
2626 P2PError::Dht(crate::error::DhtError::StoreFailed(
2627 format!("{}: Failed to store service: {e}", service_key).into(),
2628 ))
2629 })?;
2630
2631 let services_key = Key::new(b"mcp:services:index");
2633 let mut service_ids = match dht_guard.get(&services_key).await {
2634 Some(record) => {
2635 serde_json::from_slice::<Vec<String>>(&record.value).unwrap_or_default()
2636 }
2637 None => Vec::new(),
2638 };
2639
2640 if !service_ids.contains(&service.service_id) {
2641 service_ids.push(service.service_id.clone());
2642
2643 let index_data = serde_json::to_vec(&service_ids)
2644 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2645
2646 dht_guard
2647 .put(services_key.clone(), index_data)
2648 .await
2649 .map_err(|e| {
2650 P2PError::Dht(crate::error::DhtError::StoreFailed(
2651 format!("{}: Failed to update services index: {e}", services_key).into(),
2652 ))
2653 })?;
2654 }
2655
2656 Ok(())
2657 }
2658
2659 async fn broadcast_service_announcement(
2661 &self,
2662 service: &MCPService,
2663 network_sender: &Arc<dyn NetworkSender>,
2664 ) -> Result<()> {
2665 let announcement = P2PMCPMessage {
2666 message_type: P2PMCPMessageType::ServiceAdvertisement,
2667 message_id: uuid::Uuid::new_v4().to_string(),
2668 source_peer: network_sender.local_peer_id().clone(),
2669 target_peer: None, timestamp: SystemTime::now()
2671 .duration_since(std::time::UNIX_EPOCH)
2672 .unwrap_or_default()
2673 .as_secs(),
2674 payload: MCPMessage::ListToolsResult {
2675 tools: service
2676 .tools
2677 .iter()
2678 .map(|tool_name| MCPTool {
2679 name: tool_name.clone(),
2680 description: format!("Tool from {}", service.metadata.name),
2681 input_schema: json!({"type": "object"}),
2682 })
2683 .collect(),
2684 next_cursor: None,
2685 },
2686 ttl: 3,
2687 };
2688
2689 let _announcement_data = serde_json::to_vec(&announcement)
2690 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2691
2692 debug!(
2695 "Service announcement prepared for broadcast: {} tools",
2696 service.tools.len()
2697 );
2698
2699 Ok(())
2700 }
2701
2702 pub async fn discover_remote_services(&self) -> Result<Vec<MCPService>> {
2704 if let Some(dht) = &self.dht {
2705 let services_key = Key::new(b"mcp:services:index");
2706 let dht_guard = dht.read().await;
2707
2708 let service_ids = match dht_guard.get(&services_key).await {
2709 Some(record) => {
2710 serde_json::from_slice::<Vec<String>>(&record.value).unwrap_or_default()
2711 }
2712 None => {
2713 debug!("No services index found in DHT");
2714 return Ok(Vec::new());
2715 }
2716 };
2717
2718 let mut discovered_services = Vec::new();
2719
2720 for service_id in service_ids {
2721 let service_key = Key::new(format!("mcp:service:{service_id}").as_bytes());
2722
2723 if let Some(record) = dht_guard.get(&service_key).await {
2724 match serde_json::from_slice::<MCPService>(&record.value) {
2725 Ok(service) => {
2726 if service.service_id != format!("mcp-{}", self.config.server_name) {
2728 discovered_services.push(service);
2729 }
2730 }
2731 Err(e) => {
2732 warn!("Failed to deserialize service {}: {}", service_id, e);
2733 }
2734 }
2735 }
2736 }
2737
2738 debug!(
2739 "Discovered {} remote MCP services",
2740 discovered_services.len()
2741 );
2742 Ok(discovered_services)
2743 } else {
2744 Ok(Vec::new())
2745 }
2746 }
2747
2748 pub async fn refresh_service_discovery(&self) -> Result<()> {
2750 let discovered_services = self.discover_remote_services().await?;
2751
2752 {
2754 let mut remote_cache = self.remote_services.write().await;
2755 remote_cache.clear();
2756
2757 for service in discovered_services {
2758 remote_cache.insert(service.service_id.clone(), service);
2759 }
2760 }
2761
2762 {
2764 let local_service = self.create_local_service_announcement().await?;
2765 let mut local_cache = self.local_services.write().await;
2766 local_cache.insert(local_service.service_id.clone(), local_service);
2767 }
2768
2769 debug!("Service discovery refresh completed");
2770 Ok(())
2771 }
2772
2773 pub async fn get_all_services(&self) -> Result<Vec<MCPService>> {
2775 let mut all_services = Vec::new();
2776
2777 {
2779 let local_services = self.local_services.read().await;
2780 all_services.extend(local_services.values().cloned());
2781 }
2782
2783 {
2785 let remote_services = self.remote_services.read().await;
2786 all_services.extend(remote_services.values().cloned());
2787 }
2788
2789 Ok(all_services)
2790 }
2791
2792 pub async fn find_services_with_tool(&self, tool_name: &str) -> Result<Vec<MCPService>> {
2794 let all_services = self.get_all_services().await?;
2795
2796 let matching_services = all_services
2797 .into_iter()
2798 .filter(|service| service.tools.contains(&tool_name.to_string()))
2799 .collect();
2800
2801 Ok(matching_services)
2802 }
2803
2804 pub async fn handle_service_advertisement(&self, message: P2PMCPMessage) -> Result<()> {
2806 debug!(
2807 "Received service advertisement from peer: {}",
2808 message.source_peer
2809 );
2810
2811 if let MCPMessage::ListToolsResult { tools, .. } = message.payload {
2813 let service = MCPService {
2815 service_id: format!("mcp-{}", message.source_peer),
2816 node_id: message.source_peer.clone(),
2817 tools: tools.iter().map(|t| t.name.clone()).collect(),
2818 capabilities: MCPCapabilities {
2819 experimental: None,
2820 sampling: None,
2821 tools: Some(MCPToolsCapability {
2822 list_changed: Some(true),
2823 }),
2824 prompts: None,
2825 resources: None,
2826 logging: None,
2827 },
2828 metadata: MCPServiceMetadata {
2829 name: format!("Remote MCP Service - {}", message.source_peer),
2830 version: "unknown".to_string(),
2831 description: Some("Remote P2P MCP Service".to_string()),
2832 tags: vec!["p2p".to_string(), "remote".to_string()],
2833 health_status: ServiceHealthStatus::Healthy,
2834 load_metrics: ServiceLoadMetrics {
2835 active_requests: 0,
2836 requests_per_second: 0.0,
2837 avg_response_time_ms: 0.0,
2838 error_rate: 0.0,
2839 cpu_usage: 0.0,
2840 memory_usage: 0,
2841 },
2842 },
2843 registered_at: SystemTime::now(),
2844 endpoint: MCPEndpoint {
2845 protocol: "p2p".to_string(),
2846 address: message.source_peer.clone(),
2847 port: None,
2848 tls: true,
2849 auth_required: false,
2850 },
2851 };
2852
2853 {
2855 let mut remote_services = self.remote_services.write().await;
2856 remote_services.insert(service.service_id.clone(), service.clone());
2857 }
2858
2859 if let Some(dht) = &self.dht
2861 && let Err(e) = self.store_service_in_dht(&service, dht).await
2862 {
2863 warn!("Failed to store remote service in DHT: {}", e);
2864 }
2865
2866 info!(
2867 "Registered remote MCP service from {} with {} tools",
2868 message.source_peer,
2869 tools.len()
2870 );
2871 }
2872
2873 Ok(())
2874 }
2875
2876 pub async fn handle_service_discovery(
2878 &self,
2879 message: P2PMCPMessage,
2880 ) -> Result<Option<Vec<u8>>> {
2881 let local_services: Vec<MCPService> = {
2883 let services = self.local_services.read().await;
2884 services.values().cloned().collect()
2885 };
2886
2887 if !local_services.is_empty() {
2888 let advertisement = P2PMCPMessage {
2889 message_type: P2PMCPMessageType::ServiceAdvertisement,
2890 message_id: uuid::Uuid::new_v4().to_string(),
2891 source_peer: self.get_node_id_string(),
2892 target_peer: Some(message.source_peer),
2893 timestamp: SystemTime::now()
2894 .duration_since(std::time::UNIX_EPOCH)
2895 .map_err(|e| {
2896 P2PError::Identity(crate::error::IdentityError::SystemTime(
2897 format!("Time error: {e}").into(),
2898 ))
2899 })?
2900 .as_secs(),
2901 payload: MCPMessage::ListToolsResult {
2902 tools: local_services
2903 .into_iter()
2904 .flat_map(|s| {
2905 s.tools.into_iter().map(|t| MCPTool {
2906 name: t,
2907 description: "Remote tool".to_string(),
2908 input_schema: json!({"type": "object"}),
2909 })
2910 })
2911 .collect(),
2912 next_cursor: None,
2913 },
2914 ttl: message.ttl.saturating_sub(1),
2915 };
2916
2917 let response_data = serde_json::to_vec(&advertisement)
2918 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2919
2920 Ok(Some(response_data))
2921 } else {
2922 Ok(None)
2923 }
2924 }
2925
2926 pub async fn shutdown(&self) -> Result<()> {
2928 info!("Shutting down MCP server");
2929
2930 {
2932 let mut sessions = self.sessions.write().await;
2933 for session in sessions.values_mut() {
2934 session.state = MCPSessionState::Terminated;
2935 }
2936 sessions.clear();
2937 }
2938
2939 info!("MCP server shutdown complete");
2942 Ok(())
2943 }
2944}
2945
2946impl Tool {
2947 pub fn builder(name: &str, description: &str, input_schema: Value) -> ToolBuilder {
2949 ToolBuilder {
2950 name: name.to_string(),
2951 description: description.to_string(),
2952 input_schema,
2953 handler: None,
2954 tags: Vec::new(),
2955 }
2956 }
2957}
2958
2959pub struct ToolBuilder {
2961 name: String,
2962 description: String,
2963 input_schema: Value,
2964 handler: Option<Box<dyn ToolHandler + Send + Sync>>,
2965 tags: Vec<String>,
2966}
2967
2968impl ToolBuilder {
2969 pub fn handler<H: ToolHandler + Send + Sync + 'static>(mut self, handler: H) -> Self {
2971 self.handler = Some(Box::new(handler));
2972 self
2973 }
2974
2975 pub fn tags(mut self, tags: Vec<String>) -> Self {
2977 self.tags = tags;
2978 self
2979 }
2980
2981 pub fn build(self) -> Result<Tool> {
2983 let handler = self.handler.ok_or_else(|| {
2984 P2PError::Mcp(crate::error::McpError::InvalidRequest(
2985 "Tool handler is required".to_string().into(),
2986 ))
2987 })?;
2988
2989 let definition = MCPTool {
2990 name: self.name,
2991 description: self.description,
2992 input_schema: self.input_schema,
2993 };
2994
2995 let metadata = ToolMetadata {
2996 created_at: SystemTime::now(),
2997 last_called: None,
2998 call_count: 0,
2999 avg_execution_time: Duration::from_millis(0),
3000 health_status: ToolHealthStatus::Healthy,
3001 tags: self.tags,
3002 };
3003
3004 Ok(Tool {
3005 definition,
3006 handler,
3007 metadata,
3008 })
3009 }
3010}
3011
3012pub struct FunctionToolHandler<F> {
3014 function: F,
3015}
3016
3017impl<F, Fut> ToolHandler for FunctionToolHandler<F>
3018where
3019 F: Fn(Value) -> Fut + Send + Sync,
3020 Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
3021{
3022 fn execute(
3023 &self,
3024 arguments: Value,
3025 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>> {
3026 Box::pin((self.function)(arguments))
3027 }
3028}
3029
3030impl<F> FunctionToolHandler<F> {
3031 pub fn new(function: F) -> Self {
3033 Self { function }
3034 }
3035}
3036
3037impl MCPService {
3039 pub fn new(service_id: String, node_id: PeerId) -> Self {
3041 Self {
3042 service_id,
3043 node_id,
3044 tools: Vec::new(),
3045 capabilities: MCPCapabilities {
3046 experimental: None,
3047 sampling: None,
3048 tools: Some(MCPToolsCapability {
3049 list_changed: Some(true),
3050 }),
3051 prompts: None,
3052 resources: None,
3053 logging: None,
3054 },
3055 metadata: MCPServiceMetadata {
3056 name: "MCP Service".to_string(),
3057 version: "1.0.0".to_string(),
3058 description: None,
3059 tags: Vec::new(),
3060 health_status: ServiceHealthStatus::Healthy,
3061 load_metrics: ServiceLoadMetrics {
3062 active_requests: 0,
3063 requests_per_second: 0.0,
3064 avg_response_time_ms: 0.0,
3065 error_rate: 0.0,
3066 cpu_usage: 0.0,
3067 memory_usage: 0,
3068 },
3069 },
3070 registered_at: SystemTime::now(),
3071 endpoint: MCPEndpoint {
3072 protocol: "p2p".to_string(),
3073 address: "".to_string(),
3074 port: None,
3075 tls: false,
3076 auth_required: false,
3077 },
3078 }
3079 }
3080}
3081
3082impl Default for MCPCapabilities {
3083 fn default() -> Self {
3084 Self {
3085 experimental: None,
3086 sampling: None,
3087 tools: Some(MCPToolsCapability {
3088 list_changed: Some(true),
3089 }),
3090 prompts: Some(MCPPromptsCapability {
3091 list_changed: Some(true),
3092 }),
3093 resources: Some(MCPResourcesCapability {
3094 subscribe: Some(true),
3095 list_changed: Some(true),
3096 }),
3097 logging: Some(MCPLoggingCapability {
3098 levels: Some(vec![
3099 MCPLogLevel::Debug,
3100 MCPLogLevel::Info,
3101 MCPLogLevel::Warning,
3102 MCPLogLevel::Error,
3103 ]),
3104 }),
3105 }
3106 }
3107}
3108
3109#[cfg(test)]
3110mod tests {
3111 use super::*;
3112 use crate::dht::{DHT, DHTConfig, Key};
3113 use std::future::Future;
3114 use std::pin::Pin;
3115 use std::time::UNIX_EPOCH;
3116 use tokio::time::timeout;
3117
3118 struct TestTool {
3120 name: String,
3121 should_error: bool,
3122 execution_time: Duration,
3123 }
3124
3125 impl TestTool {
3126 fn new(name: &str) -> Self {
3127 Self {
3128 name: name.to_string(),
3129 should_error: false,
3130 execution_time: Duration::from_millis(10),
3131 }
3132 }
3133
3134 fn with_error(mut self) -> Self {
3135 self.should_error = true;
3136 self
3137 }
3138
3139 fn with_execution_time(mut self, duration: Duration) -> Self {
3140 self.execution_time = duration;
3141 self
3142 }
3143 }
3144
3145 impl ToolHandler for TestTool {
3146 fn execute(
3147 &self,
3148 arguments: Value,
3149 ) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>> {
3150 let should_error = self.should_error;
3151 let execution_time = self.execution_time;
3152 let name = self.name.clone();
3153
3154 Box::pin(async move {
3155 tokio::time::sleep(execution_time).await;
3156
3157 if should_error {
3158 return Err(P2PError::Mcp(crate::error::McpError::ToolExecutionFailed(
3159 format!("{}: Test error", name).into(),
3160 ))
3161 .into());
3162 }
3163
3164 Ok(json!({
3166 "tool": name,
3167 "arguments": arguments,
3168 "result": "success"
3169 }))
3170 })
3171 }
3172
3173 fn validate(&self, arguments: &Value) -> Result<()> {
3174 if !arguments.is_object() {
3175 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
3176 "Arguments must be an object".to_string().into(),
3177 ))
3178 .into());
3179 }
3180 Ok(())
3181 }
3182
3183 fn get_requirements(&self) -> ToolRequirements {
3184 ToolRequirements {
3185 max_memory: Some(1024 * 1024), max_execution_time: Some(Duration::from_secs(5)),
3187 required_capabilities: vec!["test".to_string()],
3188 requires_network: false,
3189 requires_filesystem: false,
3190 }
3191 }
3192 }
3193
3194 async fn create_test_mcp_server() -> MCPServer {
3196 let config = MCPServerConfig {
3197 server_name: "test_server".to_string(),
3198 server_version: "1.0.0".to_string(),
3199 enable_auth: false,
3200 enable_rate_limiting: false,
3201 max_concurrent_requests: 10,
3202 request_timeout: Duration::from_secs(30),
3203 enable_dht_discovery: true,
3204 rate_limit_rpm: 60,
3205 enable_logging: true,
3206 max_tool_execution_time: Duration::from_secs(30),
3207 tool_memory_limit: 100 * 1024 * 1024,
3208 health_monitor: HealthMonitorConfig::default(),
3209 };
3210
3211 MCPServer::new(config)
3212 }
3213
3214 fn create_test_tool(name: &str) -> Tool {
3216 Tool {
3217 definition: MCPTool {
3218 name: name.to_string(),
3219 description: format!("Test tool: {}", name).into(),
3220 input_schema: json!({
3221 "type": "object",
3222 "properties": {
3223 "input": { "type": "string" }
3224 }
3225 }),
3226 },
3227 handler: Box::new(TestTool::new(name)),
3228 metadata: ToolMetadata {
3229 created_at: SystemTime::now(),
3230 last_called: None,
3231 call_count: 0,
3232 avg_execution_time: Duration::from_millis(0),
3233 health_status: ToolHealthStatus::Healthy,
3234 tags: vec!["test".to_string()],
3235 },
3236 }
3237 }
3238
3239 async fn create_test_dht() -> DHT {
3241 let local_id = Key::new(b"test_node_id");
3242 let config = DHTConfig::default();
3243 DHT::new(local_id, config)
3244 }
3245
3246 fn create_test_context(caller_id: PeerId) -> MCPCallContext {
3248 MCPCallContext {
3249 caller_id,
3250 timestamp: SystemTime::now(),
3251 timeout: Duration::from_secs(30),
3252 auth_info: None,
3253 metadata: HashMap::new(),
3254 }
3255 }
3256
3257 #[tokio::test]
3258 async fn test_mcp_server_creation() {
3259 let server = create_test_mcp_server().await;
3260 assert_eq!(server.config.server_name, "test_server");
3261 assert_eq!(server.config.server_version, "1.0.0");
3262 assert!(!server.config.enable_auth);
3263 assert!(!server.config.enable_rate_limiting);
3264 }
3265
3266 #[tokio::test]
3267 async fn test_tool_registration() -> Result<()> {
3268 let server = create_test_mcp_server().await;
3269 let tool = create_test_tool("test_calculator");
3270
3271 server.register_tool(tool).await?;
3273
3274 let tools = server.tools.read().await;
3276 assert!(tools.contains_key("test_calculator"));
3277 assert_eq!(
3278 tools
3279 .get("test_calculator")
3280 .expect("Should succeed in test")
3281 .definition
3282 .name,
3283 "test_calculator"
3284 );
3285
3286 let stats = server.stats.read().await;
3288 assert_eq!(stats.total_tools, 1);
3289
3290 Ok(())
3291 }
3292
3293 #[tokio::test]
3294 async fn test_tool_registration_duplicate() -> Result<()> {
3295 let server = create_test_mcp_server().await;
3296 let tool1 = create_test_tool("duplicate_tool");
3297 let tool2 = create_test_tool("duplicate_tool");
3298
3299 server.register_tool(tool1).await?;
3301
3302 let result = server.register_tool(tool2).await;
3304 assert!(result.is_err());
3305 assert!(
3306 result
3307 .unwrap_err()
3308 .to_string()
3309 .contains("Tool already exists")
3310 );
3311
3312 Ok(())
3313 }
3314
3315 #[tokio::test]
3316 async fn test_tool_validation() {
3317 let server = create_test_mcp_server().await;
3318
3319 let mut invalid_tool = create_test_tool("");
3321 let result = server.validate_tool(&invalid_tool).await;
3322 assert!(result.is_err());
3323
3324 invalid_tool.definition.name = "a".repeat(200);
3326 let result = server.validate_tool(&invalid_tool).await;
3327 assert!(result.is_err());
3328
3329 let mut invalid_schema_tool = create_test_tool("valid_name");
3331 invalid_schema_tool.definition.input_schema = json!("not an object");
3332 let result = server.validate_tool(&invalid_schema_tool).await;
3333 assert!(result.is_err());
3334
3335 let valid_tool = create_test_tool("valid_tool");
3337 let result = server.validate_tool(&valid_tool).await;
3338 assert!(result.is_ok());
3339 }
3340
3341 #[tokio::test]
3342 async fn test_tool_call_success() -> Result<()> {
3343 let server = create_test_mcp_server().await;
3344 let tool = create_test_tool("success_tool");
3345 server.register_tool(tool).await?;
3346
3347 let caller_id = "test_peer_123".to_string();
3348 let context = create_test_context(caller_id);
3349 let arguments = json!({"input": "test data"});
3350
3351 let result = server
3352 .call_tool("success_tool", arguments.clone(), context)
3353 .await?;
3354
3355 assert_eq!(result["tool"], "success_tool");
3357 assert_eq!(result["arguments"], arguments);
3358 assert_eq!(result["result"], "success");
3359
3360 let tools = server.tools.read().await;
3362 let tool_metadata = &tools
3363 .get("success_tool")
3364 .ok_or_else(|| {
3365 P2PError::Mcp(crate::error::McpError::ToolNotFound(
3366 "Tool not found".into(),
3367 ))
3368 })?
3369 .metadata;
3370 assert_eq!(tool_metadata.call_count, 1);
3371 assert!(tool_metadata.last_called.is_some());
3372
3373 Ok(())
3374 }
3375
3376 #[tokio::test]
3377 async fn test_tool_call_nonexistent() -> Result<()> {
3378 let server = create_test_mcp_server().await;
3379 let caller_id = "test_peer_456".to_string();
3380 let context = create_test_context(caller_id);
3381 let arguments = json!({"input": "test"});
3382
3383 let result = server
3384 .call_tool("nonexistent_tool", arguments, context)
3385 .await;
3386 assert!(result.is_err());
3387 assert!(result.unwrap_err().to_string().contains("Tool not found"));
3388
3389 Ok(())
3390 }
3391
3392 #[tokio::test]
3393 async fn test_tool_call_handler_error() -> Result<()> {
3394 let server = create_test_mcp_server().await;
3395 let tool = Tool {
3396 definition: MCPTool {
3397 name: "error_tool".to_string(),
3398 description: "Tool that always errors".to_string(),
3399 input_schema: json!({"type": "object"}),
3400 },
3401 handler: Box::new(TestTool::new("error_tool").with_error()),
3402 metadata: ToolMetadata {
3403 created_at: SystemTime::now(),
3404 last_called: None,
3405 call_count: 0,
3406 avg_execution_time: Duration::from_millis(0),
3407 health_status: ToolHealthStatus::Healthy,
3408 tags: vec![],
3409 },
3410 };
3411
3412 server.register_tool(tool).await?;
3413
3414 let caller_id = "test_peer_error".to_string();
3415 let context = create_test_context(caller_id);
3416 let arguments = json!({"input": "test"});
3417
3418 let result = server.call_tool("error_tool", arguments, context).await;
3419 assert!(result.is_err());
3420 assert!(
3421 result
3422 .unwrap_err()
3423 .to_string()
3424 .contains("Test error from tool error_tool")
3425 );
3426
3427 Ok(())
3428 }
3429
3430 #[tokio::test]
3431 async fn test_tool_call_timeout() -> Result<()> {
3432 let server = create_test_mcp_server().await;
3433 let slow_tool = Tool {
3434 definition: MCPTool {
3435 name: "slow_tool".to_string(),
3436 description: "Tool that takes too long".to_string(),
3437 input_schema: json!({"type": "object"}),
3438 },
3439 handler: Box::new(
3440 TestTool::new("slow_tool").with_execution_time(Duration::from_secs(2)),
3441 ),
3442 metadata: ToolMetadata {
3443 created_at: SystemTime::now(),
3444 last_called: None,
3445 call_count: 0,
3446 avg_execution_time: Duration::from_millis(0),
3447 health_status: ToolHealthStatus::Healthy,
3448 tags: vec![],
3449 },
3450 };
3451
3452 server.register_tool(slow_tool).await?;
3453
3454 let caller_id = "test_peer_error".to_string();
3455 let context = create_test_context(caller_id);
3456 let arguments = json!({"input": "test"});
3457
3458 let result = timeout(
3460 Duration::from_millis(100),
3461 server.call_tool("slow_tool", arguments, context),
3462 )
3463 .await;
3464
3465 assert!(result.is_err()); Ok(())
3468 }
3469
3470 #[tokio::test]
3471 async fn test_tool_requirements() {
3472 let tool = TestTool::new("req_tool");
3473 let requirements = tool.get_requirements();
3474
3475 assert_eq!(requirements.max_memory, Some(1024 * 1024));
3476 assert_eq!(
3477 requirements.max_execution_time,
3478 Some(Duration::from_secs(5))
3479 );
3480 assert_eq!(requirements.required_capabilities, vec!["test"]);
3481 assert!(!requirements.requires_network);
3482 assert!(!requirements.requires_filesystem);
3483 }
3484
3485 #[tokio::test]
3486 async fn test_tool_validation_handler() {
3487 let tool = TestTool::new("validation_tool");
3488
3489 let valid_args = json!({"key": "value"});
3491 assert!(tool.validate(&valid_args).is_ok());
3492
3493 let invalid_args = json!("not an object");
3495 assert!(tool.validate(&invalid_args).is_err());
3496
3497 let invalid_args = json!(123);
3498 assert!(tool.validate(&invalid_args).is_err());
3499 }
3500
3501 #[tokio::test]
3502 async fn test_tool_health_status() {
3503 let mut metadata = ToolMetadata {
3504 created_at: SystemTime::now(),
3505 last_called: None,
3506 call_count: 0,
3507 avg_execution_time: Duration::from_millis(0),
3508 health_status: ToolHealthStatus::Healthy,
3509 tags: vec![],
3510 };
3511
3512 assert_eq!(metadata.health_status, ToolHealthStatus::Healthy);
3514
3515 metadata.health_status = ToolHealthStatus::Degraded;
3516 assert_eq!(metadata.health_status, ToolHealthStatus::Degraded);
3517
3518 metadata.health_status = ToolHealthStatus::Unhealthy;
3519 assert_eq!(metadata.health_status, ToolHealthStatus::Unhealthy);
3520
3521 metadata.health_status = ToolHealthStatus::Disabled;
3522 assert_eq!(metadata.health_status, ToolHealthStatus::Disabled);
3523 }
3524
3525 #[tokio::test]
3526 async fn test_mcp_capabilities() {
3527 let server = create_test_mcp_server().await;
3528 let capabilities = server.get_server_capabilities().await;
3529
3530 assert!(capabilities.tools.is_some());
3531 assert!(capabilities.prompts.is_some());
3532 assert!(capabilities.resources.is_some());
3533 assert!(capabilities.logging.is_some());
3534
3535 let tools_cap = capabilities.tools.expect("Test assertion failed");
3536 assert_eq!(tools_cap.list_changed, Some(true));
3537
3538 let logging_cap = capabilities.logging.expect("Test assertion failed");
3539 let levels = logging_cap.levels.expect("Test assertion failed");
3540 assert!(levels.contains(&MCPLogLevel::Debug));
3541 assert!(levels.contains(&MCPLogLevel::Info));
3542 assert!(levels.contains(&MCPLogLevel::Warning));
3543 assert!(levels.contains(&MCPLogLevel::Error));
3544 }
3545
3546 #[tokio::test]
3547 async fn test_mcp_message_serialization() {
3548 let init_msg = MCPMessage::Initialize {
3550 protocol_version: MCP_VERSION.to_string(),
3551 capabilities: MCPCapabilities {
3552 experimental: None,
3553 sampling: None,
3554 tools: Some(MCPToolsCapability {
3555 list_changed: Some(true),
3556 }),
3557 prompts: None,
3558 resources: None,
3559 logging: None,
3560 },
3561 client_info: MCPClientInfo {
3562 name: "test_client".to_string(),
3563 version: "1.0.0".to_string(),
3564 },
3565 };
3566
3567 let serialized = serde_json::to_string(&init_msg).expect("Test assertion failed");
3568 let deserialized: MCPMessage =
3569 serde_json::from_str(&serialized).expect("Test assertion failed");
3570
3571 match deserialized {
3572 MCPMessage::Initialize {
3573 protocol_version,
3574 client_info,
3575 ..
3576 } => {
3577 assert_eq!(protocol_version, MCP_VERSION);
3578 assert_eq!(client_info.name, "test_client");
3579 assert_eq!(client_info.version, "1.0.0");
3580 }
3581 _ => panic!("Wrong message type after deserialization"),
3582 }
3583 }
3584
3585 #[tokio::test]
3586 async fn test_mcp_content_types() {
3587 let text_content = MCPContent::Text {
3589 text: "Hello, world!".to_string(),
3590 };
3591
3592 let serialized = serde_json::to_string(&text_content).expect("Test assertion failed");
3593 let deserialized: MCPContent =
3594 serde_json::from_str(&serialized).expect("Test assertion failed");
3595
3596 match deserialized {
3597 MCPContent::Text { text } => assert_eq!(text, "Hello, world!"),
3598 _ => panic!("Wrong content type"),
3599 }
3600
3601 let image_content = MCPContent::Image {
3603 data: "base64data".to_string(),
3604 mime_type: "image/png".to_string(),
3605 };
3606
3607 let serialized = serde_json::to_string(&image_content).expect("Test assertion failed");
3608 let deserialized: MCPContent =
3609 serde_json::from_str(&serialized).expect("Test assertion failed");
3610
3611 match deserialized {
3612 MCPContent::Image { data, mime_type } => {
3613 assert_eq!(data, "base64data");
3614 assert_eq!(mime_type, "image/png");
3615 }
3616 _ => panic!("Wrong content type"),
3617 }
3618 }
3619
3620 #[tokio::test]
3621 async fn test_service_health_status() {
3622 let mut metrics = ServiceLoadMetrics {
3623 active_requests: 0,
3624 requests_per_second: 0.0,
3625 avg_response_time_ms: 0.0,
3626 error_rate: 0.0,
3627 cpu_usage: 0.0,
3628 memory_usage: 0,
3629 };
3630
3631 let metadata = MCPServiceMetadata {
3633 name: "test_service".to_string(),
3634 version: "1.0.0".to_string(),
3635 description: Some("Test service".to_string()),
3636 tags: vec!["test".to_string()],
3637 health_status: ServiceHealthStatus::Healthy,
3638 load_metrics: metrics.clone(),
3639 };
3640
3641 assert_eq!(metadata.health_status, ServiceHealthStatus::Healthy);
3642
3643 metrics.error_rate = 0.5; let degraded_metadata = MCPServiceMetadata {
3646 health_status: ServiceHealthStatus::Degraded,
3647 load_metrics: metrics.clone(),
3648 ..metadata.clone()
3649 };
3650
3651 assert_eq!(
3652 degraded_metadata.health_status,
3653 ServiceHealthStatus::Degraded
3654 );
3655
3656 let unhealthy_metadata = MCPServiceMetadata {
3657 health_status: ServiceHealthStatus::Unhealthy,
3658 ..metadata.clone()
3659 };
3660
3661 assert_eq!(
3662 unhealthy_metadata.health_status,
3663 ServiceHealthStatus::Unhealthy
3664 );
3665 }
3666
3667 #[tokio::test]
3668 async fn test_p2p_mcp_message() {
3669 let source_peer = "source_peer_123".to_string();
3670 let target_peer = "target_peer_456".to_string();
3671
3672 let p2p_message = P2PMCPMessage {
3673 message_type: P2PMCPMessageType::Request,
3674 message_id: uuid::Uuid::new_v4().to_string(),
3675 source_peer: source_peer.clone(),
3676 target_peer: Some(target_peer.clone()),
3677 timestamp: SystemTime::now()
3678 .duration_since(UNIX_EPOCH)
3679 .unwrap_or_else(|_| Duration::from_secs(0))
3680 .as_secs(),
3681 payload: MCPMessage::ListTools { cursor: None },
3682 ttl: 10,
3683 };
3684
3685 let serialized = serde_json::to_string(&p2p_message).expect("Test assertion failed");
3687 let deserialized: P2PMCPMessage =
3688 serde_json::from_str(&serialized).expect("Test assertion failed");
3689
3690 assert_eq!(deserialized.message_type, P2PMCPMessageType::Request);
3691 assert_eq!(deserialized.source_peer, source_peer);
3692 assert_eq!(deserialized.target_peer, Some(target_peer));
3693 assert_eq!(deserialized.ttl, 10);
3694
3695 match deserialized.payload {
3696 MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
3697 _ => panic!("Wrong message payload type"),
3698 }
3699 }
3700
3701 #[tokio::test]
3702 async fn test_tool_requirements_default() {
3703 let default_requirements = ToolRequirements::default();
3704
3705 assert_eq!(default_requirements.max_memory, Some(100 * 1024 * 1024));
3706 assert_eq!(
3707 default_requirements.max_execution_time,
3708 Some(Duration::from_secs(30))
3709 );
3710 assert!(default_requirements.required_capabilities.is_empty());
3711 assert!(!default_requirements.requires_network);
3712 assert!(!default_requirements.requires_filesystem);
3713 }
3714
3715 #[tokio::test]
3716 async fn test_mcp_server_stats() -> Result<()> {
3717 let server = create_test_mcp_server().await;
3718
3719 let stats = server.stats.read().await;
3721 assert_eq!(stats.total_tools, 0);
3722 assert_eq!(stats.total_requests, 0);
3723 assert_eq!(stats.total_responses, 0);
3724 assert_eq!(stats.total_errors, 0);
3725
3726 drop(stats);
3727
3728 let tool = create_test_tool("stats_test_tool");
3730 server.register_tool(tool).await?;
3731
3732 let stats = server.stats.read().await;
3733 assert_eq!(stats.total_tools, 1);
3734
3735 Ok(())
3736 }
3737
3738 #[tokio::test]
3739 async fn test_log_levels() {
3740 let levels = vec![
3742 MCPLogLevel::Debug,
3743 MCPLogLevel::Info,
3744 MCPLogLevel::Notice,
3745 MCPLogLevel::Warning,
3746 MCPLogLevel::Error,
3747 MCPLogLevel::Critical,
3748 MCPLogLevel::Alert,
3749 MCPLogLevel::Emergency,
3750 ];
3751
3752 for level in levels {
3753 let serialized = serde_json::to_string(&level).expect("Test assertion failed");
3754 let deserialized: MCPLogLevel =
3755 serde_json::from_str(&serialized).expect("Test assertion failed");
3756 assert_eq!(level as u8, deserialized as u8);
3757 }
3758 }
3759
3760 #[tokio::test]
3761 async fn test_mcp_endpoint() {
3762 let endpoint = MCPEndpoint {
3763 protocol: "p2p".to_string(),
3764 address: "127.0.0.1".to_string(),
3765 port: Some(9000),
3766 tls: true,
3767 auth_required: true,
3768 };
3769
3770 let serialized = serde_json::to_string(&endpoint).expect("Test assertion failed");
3771 let deserialized: MCPEndpoint =
3772 serde_json::from_str(&serialized).expect("Test assertion failed");
3773
3774 assert_eq!(deserialized.protocol, "p2p");
3775 assert_eq!(deserialized.address, "127.0.0.1");
3776 assert_eq!(deserialized.port, Some(9000));
3777 assert!(deserialized.tls);
3778 assert!(deserialized.auth_required);
3779 }
3780
3781 #[tokio::test]
3782 async fn test_mcp_service_metadata() {
3783 let load_metrics = ServiceLoadMetrics {
3784 active_requests: 5,
3785 requests_per_second: 10.5,
3786 avg_response_time_ms: 250.0,
3787 error_rate: 0.01,
3788 cpu_usage: 45.5,
3789 memory_usage: 1024 * 1024 * 100, };
3791
3792 let metadata = MCPServiceMetadata {
3793 name: "test_service".to_string(),
3794 version: "2.1.0".to_string(),
3795 description: Some("A test service for unit testing".to_string()),
3796 tags: vec!["test".to_string(), "unit".to_string(), "mcp".to_string()],
3797 health_status: ServiceHealthStatus::Healthy,
3798 load_metrics,
3799 };
3800
3801 let serialized = serde_json::to_string(&metadata).expect("Test assertion failed");
3803 let deserialized: MCPServiceMetadata =
3804 serde_json::from_str(&serialized).expect("Test assertion failed");
3805
3806 assert_eq!(deserialized.name, "test_service");
3807 assert_eq!(deserialized.version, "2.1.0");
3808 assert_eq!(
3809 deserialized.description,
3810 Some("A test service for unit testing".to_string())
3811 );
3812 assert_eq!(deserialized.tags, vec!["test", "unit", "mcp"]);
3813 assert_eq!(deserialized.health_status, ServiceHealthStatus::Healthy);
3814 assert_eq!(deserialized.load_metrics.active_requests, 5);
3815 assert_eq!(deserialized.load_metrics.requests_per_second, 10.5);
3816 }
3817
3818 #[tokio::test]
3819 async fn test_function_tool_handler() -> Result<()> {
3820 let handler = FunctionToolHandler::new(|args: Value| async move {
3822 let name = args.get("name").and_then(|v| v.as_str()).unwrap_or("world");
3823 Ok(json!({"greeting": format!("Hello, {}!", name)}))
3824 });
3825
3826 let args = json!({"name": "Alice"});
3827 let result = handler.execute(args).await?;
3828 assert_eq!(result["greeting"], "Hello, Alice!");
3829
3830 let empty_args = json!({});
3832 let result = handler.execute(empty_args).await?;
3833 assert_eq!(result["greeting"], "Hello, world!");
3834
3835 Ok(())
3836 }
3837
3838 #[tokio::test]
3839 async fn test_mcp_service_creation() {
3840 let service_id = "test_service_123".to_string();
3841 let node_id = "test_node_789".to_string();
3842
3843 let service = MCPService::new(service_id.clone(), node_id.clone());
3844
3845 assert_eq!(service.service_id, service_id);
3846 assert_eq!(service.node_id, node_id);
3847 assert!(service.tools.is_empty());
3848 assert_eq!(service.metadata.name, "MCP Service");
3849 assert_eq!(service.metadata.version, "1.0.0");
3850 assert_eq!(service.metadata.health_status, ServiceHealthStatus::Healthy);
3851 assert_eq!(service.endpoint.protocol, "p2p");
3852 assert!(!service.endpoint.tls);
3853 assert!(!service.endpoint.auth_required);
3854 }
3855
3856 #[tokio::test]
3857 async fn test_mcp_capabilities_default() {
3858 let capabilities = MCPCapabilities::default();
3859
3860 assert!(capabilities.tools.is_some());
3861 assert!(capabilities.prompts.is_some());
3862 assert!(capabilities.resources.is_some());
3863 assert!(capabilities.logging.is_some());
3864
3865 let tools_cap = capabilities.tools.expect("Test assertion failed");
3866 assert_eq!(tools_cap.list_changed, Some(true));
3867
3868 let resources_cap = capabilities.resources.expect("Test assertion failed");
3869 assert_eq!(resources_cap.subscribe, Some(true));
3870 assert_eq!(resources_cap.list_changed, Some(true));
3871
3872 let logging_cap = capabilities.logging.expect("Test assertion failed");
3873 let levels = logging_cap.levels.expect("Test assertion failed");
3874 assert!(levels.contains(&MCPLogLevel::Debug));
3875 assert!(levels.contains(&MCPLogLevel::Info));
3876 assert!(levels.contains(&MCPLogLevel::Warning));
3877 assert!(levels.contains(&MCPLogLevel::Error));
3878 }
3879
3880 #[tokio::test]
3881 async fn test_mcp_request_creation() {
3882 let source_peer = "source_peer_123".to_string();
3883 let target_peer = "target_peer_456".to_string();
3884
3885 let request = MCPRequest {
3886 request_id: uuid::Uuid::new_v4().to_string(),
3887 source_peer: source_peer.clone(),
3888 target_peer: target_peer.clone(),
3889 message: MCPMessage::ListTools { cursor: None },
3890 timestamp: SystemTime::now(),
3891 timeout: Duration::from_secs(30),
3892 auth_token: Some("test_token".to_string()),
3893 };
3894
3895 assert_eq!(request.source_peer, source_peer);
3896 assert_eq!(request.target_peer, target_peer);
3897 assert_eq!(request.timeout, Duration::from_secs(30));
3898 assert_eq!(request.auth_token, Some("test_token".to_string()));
3899
3900 match request.message {
3901 MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
3902 _ => panic!("Wrong message type"),
3903 }
3904 }
3905
3906 #[tokio::test]
3907 async fn test_p2p_message_types() {
3908 assert_eq!(P2PMCPMessageType::Request, P2PMCPMessageType::Request);
3910 assert_eq!(P2PMCPMessageType::Response, P2PMCPMessageType::Response);
3911 assert_eq!(
3912 P2PMCPMessageType::ServiceAdvertisement,
3913 P2PMCPMessageType::ServiceAdvertisement
3914 );
3915 assert_eq!(
3916 P2PMCPMessageType::ServiceDiscovery,
3917 P2PMCPMessageType::ServiceDiscovery
3918 );
3919
3920 for msg_type in [
3922 P2PMCPMessageType::Request,
3923 P2PMCPMessageType::Response,
3924 P2PMCPMessageType::ServiceAdvertisement,
3925 P2PMCPMessageType::ServiceDiscovery,
3926 ] {
3927 let serialized = serde_json::to_string(&msg_type).expect("Test assertion failed");
3928 let deserialized: P2PMCPMessageType =
3929 serde_json::from_str(&serialized).expect("Test assertion failed");
3930 assert_eq!(msg_type, deserialized);
3931 }
3932 }
3933}