1#![allow(missing_docs)]
26
27pub mod security;
28
29use crate::dht::{DHT, DhtKey, Key};
30use crate::{P2PError, PeerId, Result};
31use rand;
32use serde::{Deserialize, Serialize};
33use serde_json::{Value, json};
34use std::collections::HashMap;
35use std::sync::Arc;
36use std::time::{Duration, Instant, SystemTime};
37use tokio::sync::{RwLock, mpsc, oneshot};
38use tokio::time::timeout;
39use tracing::{debug, info, warn};
40
41pub use security::*;
42
43pub const MCP_VERSION: &str = "2024-11-05";
45
46pub const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
48
49pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30);
51
52pub const MCP_PROTOCOL: &str = "/p2p-foundation/mcp/1.0.0";
54
55#[async_trait::async_trait]
57pub trait NetworkSender: Send + Sync {
58 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
60
61 fn local_peer_id(&self) -> &PeerId;
63}
64
65pub type MessageSender = Arc<dyn Fn(&PeerId, &str, Vec<u8>) -> Result<()> + Send + Sync>;
67
68pub const SERVICE_DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(tag = "type", rename_all = "snake_case")]
74pub enum MCPMessage {
75 Initialize {
77 protocol_version: String,
79 capabilities: MCPCapabilities,
81 client_info: MCPClientInfo,
83 },
84 InitializeResult {
86 protocol_version: String,
88 capabilities: MCPCapabilities,
90 server_info: MCPServerInfo,
92 },
93 ListTools {
95 cursor: Option<String>,
97 },
98 ListToolsResult {
100 tools: Vec<MCPTool>,
102 next_cursor: Option<String>,
104 },
105 CallTool {
107 name: String,
109 arguments: Value,
111 },
112 CallToolResult {
114 content: Vec<MCPContent>,
116 is_error: bool,
118 },
119 ListPrompts {
121 cursor: Option<String>,
123 },
124 ListPromptsResult {
126 prompts: Vec<MCPPrompt>,
128 next_cursor: Option<String>,
130 },
131 GetPrompt {
133 name: String,
135 arguments: Option<Value>,
137 },
138 GetPromptResult {
140 description: Option<String>,
142 messages: Vec<MCPPromptMessage>,
144 },
145 ListResources {
147 cursor: Option<String>,
149 },
150 ListResourcesResult {
152 resources: Vec<MCPResource>,
154 next_cursor: Option<String>,
156 },
157 ReadResource {
159 uri: String,
161 },
162 ReadResourceResult {
164 contents: Vec<MCPResourceContent>,
166 },
167 SubscribeResource {
169 uri: String,
171 },
172 UnsubscribeResource {
174 uri: String,
176 },
177 ResourceUpdated {
179 uri: String,
181 },
182 ListLogs {
184 cursor: Option<String>,
186 },
187 ListLogsResult {
189 logs: Vec<MCPLogEntry>,
191 next_cursor: Option<String>,
193 },
194 SetLogLevel {
196 level: MCPLogLevel,
198 },
199 Error {
201 code: i32,
203 message: String,
205 data: Option<Value>,
207 },
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct MCPCapabilities {
213 pub experimental: Option<Value>,
215 pub sampling: Option<Value>,
217 pub tools: Option<MCPToolsCapability>,
219 pub prompts: Option<MCPPromptsCapability>,
221 pub resources: Option<MCPResourcesCapability>,
223 pub logging: Option<MCPLoggingCapability>,
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct MCPToolsCapability {
230 pub list_changed: Option<bool>,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct MCPPromptsCapability {
237 pub list_changed: Option<bool>,
239}
240
241#[derive(Debug, Clone, Serialize, Deserialize)]
243pub struct MCPResourcesCapability {
244 pub subscribe: Option<bool>,
246 pub list_changed: Option<bool>,
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct MCPLoggingCapability {
253 pub levels: Option<Vec<MCPLogLevel>>,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct MCPClientInfo {
260 pub name: String,
262 pub version: String,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct MCPServerInfo {
269 pub name: String,
271 pub version: String,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct MCPTool {
278 pub name: String,
280 pub description: String,
282 pub input_schema: Value,
284}
285
286pub struct Tool {
288 pub definition: MCPTool,
290 pub handler: Box<dyn ToolHandler + Send + Sync>,
292 pub metadata: ToolMetadata,
294}
295
296#[derive(Debug, Clone)]
298pub struct ToolMetadata {
299 pub created_at: SystemTime,
301 pub last_called: Option<SystemTime>,
303 pub call_count: u64,
305 pub avg_execution_time: Duration,
307 pub health_status: ToolHealthStatus,
309 pub tags: Vec<String>,
311}
312
313#[derive(Debug, Clone, Copy, PartialEq)]
315pub enum ToolHealthStatus {
316 Healthy,
318 Degraded,
320 Unhealthy,
322 Disabled,
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct HealthMonitorConfig {
329 pub health_check_interval: Duration,
331 pub health_check_timeout: Duration,
333 pub failure_threshold: u32,
335 pub success_threshold: u32,
337 pub heartbeat_interval: Duration,
339 pub heartbeat_timeout: Duration,
341 pub enabled: bool,
343}
344
345impl Default for HealthMonitorConfig {
346 fn default() -> Self {
347 Self {
348 health_check_interval: Duration::from_secs(30),
349 health_check_timeout: Duration::from_secs(5),
350 failure_threshold: 3,
351 success_threshold: 2,
352 heartbeat_interval: Duration::from_secs(60),
353 heartbeat_timeout: Duration::from_secs(300), enabled: true,
355 }
356 }
357}
358
359#[derive(Debug, Clone, Serialize, Deserialize)]
361pub struct ServiceHealth {
362 pub service_id: String,
364 pub status: ServiceHealthStatus,
366 pub last_health_check: Option<SystemTime>,
368 pub last_heartbeat: Option<SystemTime>,
370 pub failure_count: u32,
372 pub success_count: u32,
374 pub avg_response_time: Duration,
376 pub error_message: Option<String>,
378 pub health_history: Vec<HealthCheckResult>,
380}
381
382#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct HealthCheckResult {
385 pub timestamp: SystemTime,
387 pub success: bool,
389 pub response_time: Duration,
391 pub error_message: Option<String>,
393}
394
395#[derive(Debug, Clone, Serialize, Deserialize)]
397pub struct Heartbeat {
398 pub service_id: String,
400 pub peer_id: PeerId,
402 pub timestamp: SystemTime,
404 pub load: f32,
406 pub available_tools: Vec<String>,
408 pub capabilities: MCPCapabilities,
410}
411
412#[derive(Debug, Clone)]
414pub enum HealthEvent {
415 ServiceHealthy { service_id: String, peer_id: PeerId },
417 ServiceUnhealthy {
419 service_id: String,
420 peer_id: PeerId,
421 error: String,
422 },
423 ServiceDegraded {
425 service_id: String,
426 peer_id: PeerId,
427 reason: String,
428 },
429 HeartbeatReceived {
431 service_id: String,
432 peer_id: PeerId,
433 load: f32,
434 },
435 HeartbeatTimeout { service_id: String, peer_id: PeerId },
437}
438
439pub trait ToolHandler {
441 fn execute(
443 &self,
444 arguments: Value,
445 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>>;
446
447 fn validate(&self, arguments: &Value) -> Result<()> {
449 let _ = arguments;
451 Ok(())
452 }
453
454 fn get_requirements(&self) -> ToolRequirements {
456 ToolRequirements::default()
457 }
458}
459
460#[derive(Debug, Clone)]
462pub struct ToolRequirements {
463 pub max_memory: Option<u64>,
465 pub max_execution_time: Option<Duration>,
467 pub required_capabilities: Vec<String>,
469 pub requires_network: bool,
471 pub requires_filesystem: bool,
473}
474
475impl Default for ToolRequirements {
476 fn default() -> Self {
477 Self {
478 max_memory: Some(100 * 1024 * 1024), max_execution_time: Some(Duration::from_secs(30)),
480 required_capabilities: Vec::new(),
481 requires_network: false,
482 requires_filesystem: false,
483 }
484 }
485}
486
487#[derive(Debug, Clone, Serialize, Deserialize)]
489#[serde(tag = "type", rename_all = "snake_case")]
490pub enum MCPContent {
491 Text {
493 text: String,
495 },
496 Image {
498 data: String,
500 mime_type: String,
502 },
503 Resource {
505 resource: MCPResourceReference,
507 },
508}
509
510#[derive(Debug, Clone, Serialize, Deserialize)]
512pub struct MCPResourceReference {
513 pub uri: String,
515 pub type_: Option<String>,
517}
518
519#[derive(Debug, Clone, Serialize, Deserialize)]
521pub struct MCPPrompt {
522 pub name: String,
524 pub description: Option<String>,
526 pub arguments: Option<Value>,
528}
529
530#[derive(Debug, Clone, Serialize, Deserialize)]
532pub struct MCPPromptMessage {
533 pub role: MCPRole,
535 pub content: MCPContent,
537}
538
539#[derive(Debug, Clone, Serialize, Deserialize)]
541#[serde(rename_all = "snake_case")]
542pub enum MCPRole {
543 User,
545 Assistant,
547 System,
549}
550
551#[derive(Debug, Clone, Serialize, Deserialize)]
553pub struct MCPResource {
554 pub uri: String,
556 pub name: String,
558 pub description: Option<String>,
560 pub mime_type: Option<String>,
562}
563
564#[derive(Debug, Clone, Serialize, Deserialize)]
566pub struct MCPResourceContent {
567 pub uri: String,
569 pub mime_type: String,
571 pub text: Option<String>,
573 pub blob: Option<String>,
575}
576
577#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
579#[serde(rename_all = "snake_case")]
580pub enum MCPLogLevel {
581 Debug,
583 Info,
585 Notice,
587 Warning,
589 Error,
591 Critical,
593 Alert,
595 Emergency,
597}
598
599#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct MCPLogEntry {
602 pub level: MCPLogLevel,
604 pub data: Value,
606 pub logger: Option<String>,
608}
609
610#[derive(Debug, Clone, Serialize, Deserialize)]
612pub struct MCPService {
613 pub service_id: String,
615 pub node_id: PeerId,
617 pub tools: Vec<String>,
619 pub capabilities: MCPCapabilities,
621 pub metadata: MCPServiceMetadata,
623 pub registered_at: SystemTime,
625 pub endpoint: MCPEndpoint,
627}
628
629#[derive(Debug, Clone, Serialize, Deserialize)]
631pub struct MCPServiceMetadata {
632 pub name: String,
634 pub version: String,
636 pub description: Option<String>,
638 pub tags: Vec<String>,
640 pub health_status: ServiceHealthStatus,
642 pub load_metrics: ServiceLoadMetrics,
644}
645
646#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
648pub enum ServiceHealthStatus {
649 Healthy,
651 Degraded,
653 Unhealthy,
655 Disabled,
657 Unknown,
659}
660
661#[derive(Debug, Clone, Serialize, Deserialize)]
663pub struct ServiceLoadMetrics {
664 pub active_requests: u32,
666 pub requests_per_second: f64,
668 pub avg_response_time_ms: f64,
670 pub error_rate: f64,
672 pub cpu_usage: f64,
674 pub memory_usage: u64,
676}
677
678#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct MCPEndpoint {
681 pub protocol: String,
683 pub address: String,
685 pub port: Option<u16>,
687 pub tls: bool,
689 pub auth_required: bool,
691}
692
693#[derive(Debug, Clone)]
695pub struct MCPRequest {
696 pub request_id: String,
698 pub source_peer: PeerId,
700 pub target_peer: PeerId,
702 pub message: MCPMessage,
704 pub timestamp: SystemTime,
706 pub timeout: Duration,
708 pub auth_token: Option<String>,
710}
711
712#[derive(Debug, Clone, Serialize, Deserialize)]
714pub struct P2PMCPMessage {
715 pub message_type: P2PMCPMessageType,
717 pub message_id: String,
719 pub source_peer: PeerId,
721 pub target_peer: Option<PeerId>,
723 pub timestamp: u64,
725 pub payload: MCPMessage,
727 pub ttl: u8,
729}
730
731#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
733pub enum P2PMCPMessageType {
734 Request,
736 Response,
738 ServiceAdvertisement,
740 ServiceDiscovery,
742 Heartbeat,
744 HealthCheck,
746}
747
748#[derive(Debug, Clone)]
750pub struct MCPResponse {
751 pub request_id: String,
753 pub message: MCPMessage,
755 pub timestamp: SystemTime,
757 pub processing_time: Duration,
759}
760
761#[derive(Debug, Clone)]
763pub struct MCPCallContext {
764 pub caller_id: PeerId,
766 pub timestamp: SystemTime,
768 pub timeout: Duration,
770 pub auth_info: Option<MCPAuthInfo>,
772 pub metadata: HashMap<String, String>,
774}
775
776#[derive(Debug, Clone)]
778pub struct MCPAuthInfo {
779 pub token: String,
781 pub token_type: String,
783 pub expires_at: Option<SystemTime>,
785 pub permissions: Vec<String>,
787}
788
789#[derive(Debug, Clone, Serialize, Deserialize)]
791pub struct MCPServerConfig {
792 pub server_name: String,
794 pub server_version: String,
796 pub enable_dht_discovery: bool,
798 pub max_concurrent_requests: usize,
800 pub request_timeout: Duration,
802 pub enable_auth: bool,
804 pub enable_rate_limiting: bool,
806 pub rate_limit_rpm: u32,
808 pub enable_logging: bool,
810 pub max_tool_execution_time: Duration,
812 pub tool_memory_limit: u64,
814 pub health_monitor: HealthMonitorConfig,
816}
817
818impl Default for MCPServerConfig {
819 fn default() -> Self {
820 Self {
821 server_name: "P2P-MCP-Server".to_string(),
822 server_version: crate::VERSION.to_string(),
823 enable_dht_discovery: true,
824 max_concurrent_requests: 100,
825 request_timeout: DEFAULT_CALL_TIMEOUT,
826 enable_auth: true,
827 enable_rate_limiting: true,
828 rate_limit_rpm: 60,
829 enable_logging: true,
830 max_tool_execution_time: Duration::from_secs(30),
831 tool_memory_limit: 100 * 1024 * 1024, health_monitor: HealthMonitorConfig::default(),
833 }
834 }
835}
836
837pub struct MCPServer {
839 config: MCPServerConfig,
841 tools: Arc<RwLock<HashMap<String, Tool>>>,
843 #[allow(dead_code)]
845 prompts: Arc<RwLock<HashMap<String, MCPPrompt>>>,
846 #[allow(dead_code)]
848 resources: Arc<RwLock<HashMap<String, MCPResource>>>,
849 sessions: Arc<RwLock<HashMap<String, MCPSession>>>,
851 request_handlers: Arc<RwLock<HashMap<String, oneshot::Sender<MCPResponse>>>>,
853 dht: Option<Arc<RwLock<DHT>>>,
855 local_services: Arc<RwLock<HashMap<String, MCPService>>>,
857 remote_services: Arc<RwLock<HashMap<String, MCPService>>>,
859 stats: Arc<RwLock<MCPServerStats>>,
861 request_tx: mpsc::UnboundedSender<MCPRequest>,
863 #[allow(dead_code)]
865 response_rx: Arc<RwLock<mpsc::UnboundedReceiver<MCPResponse>>>,
866 security_manager: Option<Arc<MCPSecurityManager>>,
868 audit_logger: Arc<SecurityAuditLogger>,
870 network_sender: Arc<RwLock<Option<Arc<dyn NetworkSender>>>>,
872 service_health: Arc<RwLock<HashMap<String, ServiceHealth>>>,
874 health_event_tx: mpsc::UnboundedSender<HealthEvent>,
876 #[allow(dead_code)]
878 health_event_rx: Arc<RwLock<mpsc::UnboundedReceiver<HealthEvent>>>,
879 node_id: Option<PeerId>,
881}
882
883#[derive(Debug, Clone)]
885pub struct MCPSession {
886 pub session_id: String,
888 pub peer_id: PeerId,
890 pub client_capabilities: Option<MCPCapabilities>,
892 pub started_at: SystemTime,
894 pub last_activity: SystemTime,
896 pub state: MCPSessionState,
898 pub subscribed_resources: Vec<String>,
900}
901
902#[derive(Debug, Clone, PartialEq)]
904pub enum MCPSessionState {
905 Initializing,
907 Active,
909 Inactive,
911 Terminated,
913}
914
915#[derive(Debug, Clone)]
917pub struct MCPServerStats {
918 pub total_requests: u64,
920 pub total_responses: u64,
922 pub total_errors: u64,
924 pub avg_response_time: Duration,
926 pub active_sessions: u32,
928 pub total_tools: u32,
930 pub popular_tools: HashMap<String, u64>,
932 pub server_started_at: SystemTime,
934}
935
936impl Default for MCPServerStats {
937 fn default() -> Self {
938 Self {
939 total_requests: 0,
940 total_responses: 0,
941 total_errors: 0,
942 avg_response_time: Duration::from_millis(0),
943 active_sessions: 0,
944 total_tools: 0,
945 popular_tools: HashMap::new(),
946 server_started_at: SystemTime::now(),
947 }
948 }
949}
950
951impl MCPServer {
952 pub fn new(config: MCPServerConfig) -> Self {
954 let (request_tx, _request_rx) = mpsc::unbounded_channel();
955 let (_response_tx, response_rx) = mpsc::unbounded_channel();
956 let (health_event_tx, health_event_rx) = mpsc::unbounded_channel();
957
958 let security_manager = if config.enable_auth {
960 let secret_key = (0..32).map(|_| rand::random::<u8>()).collect();
962 Some(Arc::new(MCPSecurityManager::new(
963 secret_key,
964 config.rate_limit_rpm,
965 )))
966 } else {
967 None
968 };
969
970 Self {
971 config,
972 tools: Arc::new(RwLock::new(HashMap::new())),
973 prompts: Arc::new(RwLock::new(HashMap::new())),
974 resources: Arc::new(RwLock::new(HashMap::new())),
975 sessions: Arc::new(RwLock::new(HashMap::new())),
976 request_handlers: Arc::new(RwLock::new(HashMap::new())),
977 dht: None,
978 local_services: Arc::new(RwLock::new(HashMap::new())),
979 remote_services: Arc::new(RwLock::new(HashMap::new())),
980 stats: Arc::new(RwLock::new(MCPServerStats::default())),
981 request_tx,
982 response_rx: Arc::new(RwLock::new(response_rx)),
983 security_manager,
984 audit_logger: Arc::new(SecurityAuditLogger::new(10000)), network_sender: Arc::new(RwLock::new(None)),
986 service_health: Arc::new(RwLock::new(HashMap::new())),
987 health_event_tx,
988 health_event_rx: Arc::new(RwLock::new(health_event_rx)),
989 node_id: None,
990 }
991 }
992
993 pub fn with_dht(mut self, dht: Arc<RwLock<DHT>>) -> Self {
995 self.dht = Some(dht);
996 self
997 }
998
999 pub async fn with_network_sender(mut self, sender: Arc<dyn NetworkSender>) -> Self {
1001 let peer_id = sender.local_peer_id().clone();
1002 self.node_id = Some(peer_id);
1003 *self.network_sender.write().await = Some(sender);
1004 self
1005 }
1006
1007 pub async fn set_network_sender(&self, sender: Arc<dyn NetworkSender>) {
1009 let peer_id = sender.local_peer_id().clone();
1010 *self.network_sender.write().await = Some(sender);
1011 info!("MCP server network sender configured for peer {}", peer_id);
1012 }
1013
1014 fn get_node_id_string(&self) -> String {
1016 self.node_id
1017 .as_ref()
1018 .map(|id| id.to_string())
1019 .unwrap_or_else(|| "uninitialized".to_string())
1020 }
1021
1022 pub async fn start(&self) -> Result<()> {
1024 info!("Starting MCP server: {}", self.config.server_name);
1025
1026 self.start_request_processor().await?;
1028
1029 if self.dht.is_some() {
1031 self.start_service_discovery().await?;
1032 }
1033
1034 self.start_health_monitor().await?;
1036
1037 info!("MCP server started successfully");
1038 Ok(())
1039 }
1040
1041 pub async fn register_tool(&self, tool: Tool) -> Result<()> {
1043 let tool_name = tool.definition.name.clone();
1044
1045 self.validate_tool(&tool).await?;
1047
1048 {
1050 let mut tools = self.tools.write().await;
1051 tools.insert(tool_name.clone(), tool);
1052 }
1053
1054 {
1056 let mut stats = self.stats.write().await;
1057 stats.total_tools += 1;
1058 }
1059
1060 if let Some(dht) = &self.dht {
1062 self.register_tool_in_dht(&tool_name, dht).await?;
1063 }
1064
1065 if let Err(e) = self.announce_local_services().await {
1067 warn!("Failed to announce service after tool registration: {}", e);
1068 }
1069
1070 info!("Registered tool: {}", tool_name);
1071 Ok(())
1072 }
1073
1074 async fn validate_tool(&self, tool: &Tool) -> Result<()> {
1076 let tools = self.tools.read().await;
1078 if tools.contains_key(&tool.definition.name) {
1079 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1080 format!("Tool already exists: {}", tool.definition.name).into(),
1081 )));
1082 }
1083
1084 if tool.definition.name.is_empty() || tool.definition.name.len() > 100 {
1086 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1087 "Invalid tool name".to_string().into(),
1088 )));
1089 }
1090
1091 if !tool.definition.input_schema.is_object() {
1093 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1094 "Tool input schema must be an object".to_string().into(),
1095 )));
1096 }
1097
1098 Ok(())
1099 }
1100
1101 async fn register_tool_in_dht(&self, tool_name: &str, dht: &Arc<RwLock<DHT>>) -> Result<()> {
1103 let hash = blake3::hash(format!("mcp:tool:{tool_name}").as_bytes());
1104 let key: Key = *hash.as_bytes();
1105 let service_info = json!({
1106 "tool_name": tool_name,
1107 "node_id": self.get_node_id_string(),
1108 "registered_at": SystemTime::now().duration_since(std::time::UNIX_EPOCH).map_err(|e| P2PError::Identity(crate::error::IdentityError::SystemTime(format!("Time error: {e}").into())))?.as_secs(),
1109 "capabilities": self.get_server_capabilities().await
1110 });
1111
1112 let mut dht_guard = dht.write().await;
1113 dht_guard
1114 .store(&DhtKey::from_bytes(key), serde_json::to_vec(&service_info)?)
1115 .await
1116 .map_err(|e| {
1117 P2PError::Dht(crate::error::DhtError::StoreFailed(
1118 format!("Failed to register service: {e}").into(),
1119 ))
1120 })?;
1121
1122 Ok(())
1123 }
1124
1125 async fn get_server_capabilities(&self) -> MCPCapabilities {
1127 MCPCapabilities {
1128 experimental: None,
1129 sampling: None,
1130 tools: Some(MCPToolsCapability {
1131 list_changed: Some(true),
1132 }),
1133 prompts: Some(MCPPromptsCapability {
1134 list_changed: Some(true),
1135 }),
1136 resources: Some(MCPResourcesCapability {
1137 subscribe: Some(true),
1138 list_changed: Some(true),
1139 }),
1140 logging: Some(MCPLoggingCapability {
1141 levels: Some(vec![
1142 MCPLogLevel::Debug,
1143 MCPLogLevel::Info,
1144 MCPLogLevel::Warning,
1145 MCPLogLevel::Error,
1146 ]),
1147 }),
1148 }
1149 }
1150
1151 pub async fn call_tool(
1153 &self,
1154 tool_name: &str,
1155 arguments: Value,
1156 context: MCPCallContext,
1157 ) -> Result<Value> {
1158 let start_time = Instant::now();
1159
1160 if !self.check_rate_limit(&context.caller_id).await? {
1164 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1165 "Rate limit exceeded".to_string().into(),
1166 )));
1167 }
1168
1169 if !self
1171 .check_permission(&context.caller_id, &MCPPermission::ExecuteTools)
1172 .await?
1173 {
1174 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1175 "Permission denied: execute tools".to_string().into(),
1176 )));
1177 }
1178
1179 let tool_security_level = self.get_tool_security_policy(tool_name).await;
1181 let is_trusted = self.is_trusted_peer(&context.caller_id).await;
1182
1183 match tool_security_level {
1184 SecurityLevel::Admin => {
1185 if !self
1186 .check_permission(&context.caller_id, &MCPPermission::Admin)
1187 .await?
1188 {
1189 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1190 "Permission denied: admin access required"
1191 .to_string()
1192 .into(),
1193 )));
1194 }
1195 }
1196 SecurityLevel::Strong => {
1197 if !is_trusted {
1198 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1199 "Permission denied: trusted peer required"
1200 .to_string()
1201 .into(),
1202 )));
1203 }
1204 }
1205 SecurityLevel::Basic => {
1206 if self.config.enable_auth {
1208 if let Some(auth_info) = &context.auth_info {
1209 self.verify_auth_token(&auth_info.token).await?;
1210 } else {
1211 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1212 "Authentication required".to_string().into(),
1213 )));
1214 }
1215 }
1216 }
1217 SecurityLevel::Public => {
1218 }
1220 }
1221
1222 let mut details = HashMap::new();
1224 details.insert("action".to_string(), "tool_call".to_string());
1225 details.insert("tool_name".to_string(), tool_name.to_string());
1226 details.insert(
1227 "security_level".to_string(),
1228 format!("{tool_security_level:?}"),
1229 );
1230
1231 self.audit_logger
1232 .log_event(
1233 "tool_execution".to_string(),
1234 context.caller_id.clone(),
1235 details,
1236 AuditSeverity::Info,
1237 )
1238 .await;
1239
1240 let tool_exists = {
1242 let tools = self.tools.read().await;
1243 tools.contains_key(tool_name)
1244 };
1245
1246 if !tool_exists {
1247 return Err(P2PError::Mcp(crate::error::McpError::ToolNotFound(
1248 tool_name.to_string().into(),
1249 )));
1250 }
1251
1252 let requirements = {
1254 let tools = self.tools.read().await;
1255 let tool = tools.get(tool_name).ok_or_else(|| {
1256 P2PError::Mcp(crate::error::McpError::ToolNotFound(
1257 tool_name.to_string().into(),
1258 ))
1259 })?;
1260
1261 if let Err(e) = tool.handler.validate(&arguments) {
1263 return Err(P2PError::Mcp(crate::error::McpError::ExecutionFailed(
1264 format!("{}: Validation failed: {e}", tool_name).into(),
1265 )));
1266 }
1267
1268 tool.handler.get_requirements()
1270 };
1271
1272 self.check_resource_requirements(&requirements).await?;
1274
1275 let tools_clone = self.tools.clone();
1277 let tool_name_owned = tool_name.to_string();
1278 let execution_timeout = context
1279 .timeout
1280 .min(requirements.max_execution_time.unwrap_or(context.timeout));
1281
1282 let result = timeout(execution_timeout, async move {
1283 let tools = tools_clone.read().await;
1284 let tool = tools.get(&tool_name_owned).ok_or_else(|| {
1285 P2PError::Mcp(crate::error::McpError::ToolNotFound(
1286 tool_name_owned.clone().into(),
1287 ))
1288 })?;
1289 tool.handler.execute(arguments).await
1290 })
1291 .await
1292 .map_err(|_| {
1293 P2PError::Mcp(crate::error::McpError::ExecutionFailed(
1294 format!("{}: Tool execution timeout", tool_name).into(),
1295 ))
1296 })?
1297 .map_err(|e| {
1298 P2PError::Mcp(crate::error::McpError::ExecutionFailed(
1299 format!("{}: {e}", tool_name).into(),
1300 ))
1301 })?;
1302
1303 let execution_time = start_time.elapsed();
1304
1305 self.update_tool_stats(tool_name, execution_time, true)
1307 .await;
1308
1309 {
1311 let mut stats = self.stats.write().await;
1312 stats.total_requests += 1;
1313 stats.total_responses += 1;
1314
1315 let new_total_time = stats
1317 .avg_response_time
1318 .mul_f64(stats.total_responses as f64 - 1.0)
1319 + execution_time;
1320 stats.avg_response_time = new_total_time.div_f64(stats.total_responses as f64);
1321
1322 *stats
1324 .popular_tools
1325 .entry(tool_name.to_string())
1326 .or_insert(0) += 1;
1327 }
1328
1329 debug!("Tool '{}' executed in {:?}", tool_name, execution_time);
1330 Ok(result)
1331 }
1332
1333 async fn check_resource_requirements(&self, requirements: &ToolRequirements) -> Result<()> {
1335 if let Some(max_memory) = requirements.max_memory
1337 && max_memory > self.config.tool_memory_limit
1338 {
1339 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1340 "Tool memory requirement exceeds limit".to_string().into(),
1341 )));
1342 }
1343
1344 if let Some(max_execution_time) = requirements.max_execution_time
1346 && max_execution_time > self.config.max_tool_execution_time
1347 {
1348 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1349 "Tool execution time requirement exceeds limit"
1350 .to_string()
1351 .into(),
1352 )));
1353 }
1354
1355 Ok(())
1358 }
1359
1360 async fn update_tool_stats(&self, tool_name: &str, execution_time: Duration, success: bool) {
1362 let mut tools = self.tools.write().await;
1363 if let Some(tool) = tools.get_mut(tool_name) {
1364 tool.metadata.call_count += 1;
1365 tool.metadata.last_called = Some(SystemTime::now());
1366
1367 let new_total_time = tool
1369 .metadata
1370 .avg_execution_time
1371 .mul_f64(tool.metadata.call_count as f64 - 1.0)
1372 + execution_time;
1373 tool.metadata.avg_execution_time =
1374 new_total_time.div_f64(tool.metadata.call_count as f64);
1375
1376 if !success {
1378 tool.metadata.health_status = match tool.metadata.health_status {
1379 ToolHealthStatus::Healthy => ToolHealthStatus::Degraded,
1380 ToolHealthStatus::Degraded => ToolHealthStatus::Unhealthy,
1381 other => other,
1382 };
1383 } else if tool.metadata.health_status != ToolHealthStatus::Disabled {
1384 tool.metadata.health_status = ToolHealthStatus::Healthy;
1385 }
1386 }
1387 }
1388
1389 pub async fn list_tools(
1391 &self,
1392 _cursor: Option<String>,
1393 ) -> Result<(Vec<MCPTool>, Option<String>)> {
1394 let tools = self.tools.read().await;
1395 let tool_definitions: Vec<MCPTool> =
1396 tools.values().map(|tool| tool.definition.clone()).collect();
1397
1398 Ok((tool_definitions, None))
1401 }
1402
1403 async fn start_request_processor(&self) -> Result<()> {
1405 let _request_tx = self.request_tx.clone();
1406 let _server_clone = Arc::new(self);
1407
1408 tokio::spawn(async move {
1409 info!("MCP request processor started");
1410
1411 tokio::time::sleep(Duration::from_millis(100)).await;
1417
1418 info!("MCP request processor stopped");
1419 });
1420
1421 Ok(())
1422 }
1423
1424 async fn start_service_discovery(&self) -> Result<()> {
1426 if let Some(dht) = self.dht.clone() {
1427 let _stats = self.stats.clone();
1428 let remote_services = self.remote_services.clone();
1429
1430 tokio::spawn(async move {
1431 info!("MCP service discovery started");
1432
1433 loop {
1434 tokio::time::sleep(SERVICE_DISCOVERY_INTERVAL).await;
1436
1437 let hash = blake3::hash(b"mcp:services");
1439 let key: Key = *hash.as_bytes();
1440 let dht_guard = dht.read().await;
1441
1442 match dht_guard.retrieve(&DhtKey::from_bytes(key)).await {
1443 Ok(Some(value)) => {
1444 match serde_json::from_slice::<Vec<MCPService>>(&value) {
1445 Ok(services) => {
1446 debug!("Discovered {} MCP services", services.len());
1447
1448 {
1450 let mut remote_cache = remote_services.write().await;
1451 for service in services {
1452 remote_cache
1453 .insert(service.service_id.clone(), service);
1454 }
1455 }
1456 }
1457 Err(e) => {
1458 debug!("Failed to deserialize services: {}", e);
1459 }
1460 }
1461 }
1462 Ok(None) | Err(_) => {
1463 debug!("No MCP services found in DHT");
1464 }
1465 }
1466 }
1467 });
1468 }
1469
1470 Ok(())
1471 }
1472
1473 async fn start_health_monitor(&self) -> Result<()> {
1475 if !self.config.health_monitor.enabled {
1476 debug!("Health monitoring is disabled");
1477 return Ok(());
1478 }
1479
1480 info!(
1481 "Starting health monitoring with interval: {:?}",
1482 self.config.health_monitor.health_check_interval
1483 );
1484
1485 let service_health = Arc::clone(&self.service_health);
1487 let remote_services = Arc::clone(&self.remote_services);
1488 let network_sender = Arc::clone(&self.network_sender);
1489 let health_event_tx = self.health_event_tx.clone();
1490 let config = self.config.health_monitor.clone();
1491
1492 let health_check_task = {
1494 let service_health = Arc::clone(&service_health);
1495 let remote_services = Arc::clone(&remote_services);
1496 let network_sender = Arc::clone(&network_sender);
1497 let health_event_tx = health_event_tx.clone();
1498 let config = config.clone();
1499
1500 tokio::spawn(async move {
1501 let mut interval = tokio::time::interval(config.health_check_interval);
1502
1503 loop {
1504 interval.tick().await;
1505
1506 let services_to_check: Vec<MCPService> = {
1508 let remote_guard = remote_services.read().await;
1509 remote_guard.values().cloned().collect()
1510 };
1511
1512 for service in services_to_check {
1514 if let Some(sender) = network_sender.read().await.as_ref() {
1515 Self::perform_health_check(
1516 &service,
1517 sender.as_ref(),
1518 &service_health,
1519 &health_event_tx,
1520 &config,
1521 )
1522 .await;
1523 }
1524 }
1525 }
1526 })
1527 };
1528
1529 let heartbeat_task = {
1531 let network_sender = Arc::clone(&network_sender);
1532 let health_event_tx = health_event_tx.clone();
1533 let config = config.clone();
1534
1535 tokio::spawn(async move {
1536 let mut interval = tokio::time::interval(config.heartbeat_interval);
1537
1538 loop {
1539 interval.tick().await;
1540
1541 if let Some(sender) = network_sender.read().await.as_ref() {
1542 Self::send_heartbeat(sender.as_ref(), &health_event_tx).await;
1543 }
1544 }
1545 })
1546 };
1547
1548 let timeout_task = {
1550 let service_health = Arc::clone(&service_health);
1551 let health_event_tx = health_event_tx.clone();
1552 let config = config.clone();
1553
1554 tokio::spawn(async move {
1555 let mut interval = tokio::time::interval(Duration::from_secs(30)); loop {
1558 interval.tick().await;
1559
1560 Self::check_heartbeat_timeouts(&service_health, &health_event_tx, &config)
1561 .await;
1562 }
1563 })
1564 };
1565
1566 tokio::spawn(async move {
1568 tokio::select! {
1569 _ = health_check_task => debug!("Health check task completed"),
1570 _ = heartbeat_task => debug!("Heartbeat task completed"),
1571 _ = timeout_task => debug!("Timeout monitoring task completed"),
1572 }
1573 });
1574
1575 info!("Health monitoring started successfully");
1576 Ok(())
1577 }
1578
1579 async fn perform_health_check(
1581 service: &MCPService,
1582 network_sender: &dyn NetworkSender,
1583 service_health: &Arc<RwLock<HashMap<String, ServiceHealth>>>,
1584 health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1585 config: &HealthMonitorConfig,
1586 ) {
1587 let start_time = Instant::now();
1588 let service_id = service.service_id.clone();
1589 let peer_id = service.node_id.clone();
1590
1591 let health_check_message = MCPMessage::CallTool {
1594 name: "health_check".to_string(),
1595 arguments: json!({
1596 "service_id": service_id,
1597 "timestamp": SystemTime::now()
1598 .duration_since(SystemTime::UNIX_EPOCH)
1599 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
1600 .as_secs()
1601 }),
1602 };
1603
1604 let result = match serde_json::to_vec(&health_check_message) {
1606 Ok(data) => {
1607 timeout(
1608 config.health_check_timeout,
1609 network_sender.send_message(&peer_id, MCP_PROTOCOL, data),
1610 )
1611 .await
1612 }
1613 Err(e) => {
1614 debug!("Failed to serialize health check message: {}", e);
1615 return;
1616 }
1617 };
1618
1619 let response_time = start_time.elapsed();
1620 let success = result.as_ref().map(|r| r.is_ok()).unwrap_or(false);
1621
1622 let mut health_guard = service_health.write().await;
1624 let health = health_guard
1625 .entry(service_id.clone())
1626 .or_insert_with(|| ServiceHealth {
1627 service_id: service_id.clone(),
1628 status: ServiceHealthStatus::Unknown,
1629 last_health_check: None,
1630 last_heartbeat: None,
1631 failure_count: 0,
1632 success_count: 0,
1633 avg_response_time: Duration::from_millis(0),
1634 error_message: None,
1635 health_history: Vec::new(),
1636 });
1637
1638 let check_result = HealthCheckResult {
1640 timestamp: SystemTime::now(),
1641 success,
1642 response_time,
1643 error_message: if success {
1644 None
1645 } else {
1646 Some("Health check failed".to_string())
1647 },
1648 };
1649
1650 health.health_history.push(check_result);
1651 if health.health_history.len() > 10 {
1652 health.health_history.remove(0);
1653 }
1654
1655 let previous_status = health.status;
1657 if success {
1658 health.failure_count = 0;
1659 health.success_count += 1;
1660 health.last_health_check = Some(SystemTime::now());
1661
1662 if health.success_count >= config.success_threshold {
1663 health.status = ServiceHealthStatus::Healthy;
1664 health.error_message = None;
1665 }
1666 } else {
1667 health.success_count = 0;
1668 health.failure_count += 1;
1669
1670 if health.failure_count >= config.failure_threshold {
1671 health.status = ServiceHealthStatus::Unhealthy;
1672 health.error_message = Some("Health check failures exceeded threshold".to_string());
1673 }
1674 }
1675
1676 let total_time: Duration = health.health_history.iter().map(|h| h.response_time).sum();
1678 health.avg_response_time = total_time / health.health_history.len() as u32;
1679
1680 if previous_status != health.status {
1682 let event = match health.status {
1683 ServiceHealthStatus::Healthy => HealthEvent::ServiceHealthy {
1684 service_id: service_id.clone(),
1685 peer_id: peer_id.clone(),
1686 },
1687 ServiceHealthStatus::Unhealthy => HealthEvent::ServiceUnhealthy {
1688 service_id: service_id.clone(),
1689 peer_id: peer_id.clone(),
1690 error: health
1691 .error_message
1692 .clone()
1693 .unwrap_or_else(|| "Unknown error".to_string()),
1694 },
1695 ServiceHealthStatus::Degraded => HealthEvent::ServiceDegraded {
1696 service_id: service_id.clone(),
1697 peer_id: peer_id.clone(),
1698 reason: "Performance degradation detected".to_string(),
1699 },
1700 _ => return, };
1702
1703 if let Err(e) = health_event_tx.send(event) {
1704 debug!("Failed to send health event: {}", e);
1705 }
1706 }
1707 }
1708
1709 async fn send_heartbeat(
1711 network_sender: &dyn NetworkSender,
1712 health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1713 ) {
1714 let load = {
1717 0.2 };
1721
1722 let available_tools = vec!["query".to_string(), "update".to_string()]; let heartbeat = Heartbeat {
1727 service_id: "mcp-server".to_string(),
1728 peer_id: network_sender.local_peer_id().clone(),
1729 timestamp: SystemTime::now(),
1730 load,
1731 available_tools,
1732 capabilities: MCPCapabilities {
1733 experimental: None,
1734 sampling: None,
1735 tools: Some(MCPToolsCapability {
1736 list_changed: Some(true),
1737 }),
1738 prompts: None,
1739 resources: None,
1740 logging: None,
1741 },
1742 };
1743
1744 let heartbeat_message = MCPMessage::CallTool {
1746 name: "heartbeat".to_string(),
1747 arguments: serde_json::to_value(&heartbeat).unwrap_or(json!({})),
1748 };
1749
1750 if let Ok(_data) = serde_json::to_vec(&heartbeat_message) {
1751 debug!("Sending heartbeat for service: {}", heartbeat.service_id);
1754
1755 let event = HealthEvent::HeartbeatReceived {
1757 service_id: heartbeat.service_id.clone(),
1758 peer_id: heartbeat.peer_id.clone(),
1759 load: heartbeat.load,
1760 };
1761
1762 if let Err(e) = health_event_tx.send(event) {
1763 debug!("Failed to send heartbeat event: {}", e);
1764 }
1765 }
1766 }
1767
1768 async fn check_heartbeat_timeouts(
1770 service_health: &Arc<RwLock<HashMap<String, ServiceHealth>>>,
1771 health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1772 config: &HealthMonitorConfig,
1773 ) {
1774 let now = SystemTime::now();
1775 let mut health_guard = service_health.write().await;
1776
1777 for (service_id, health) in health_guard.iter_mut() {
1778 if let Some(last_heartbeat) = health.last_heartbeat
1779 && let Ok(duration) = now.duration_since(last_heartbeat)
1780 && duration > config.heartbeat_timeout
1781 {
1782 let previous_status = health.status;
1783 health.status = ServiceHealthStatus::Unhealthy;
1784 health.error_message = Some("Heartbeat timeout".to_string());
1785
1786 if previous_status != ServiceHealthStatus::Unhealthy {
1788 let event = HealthEvent::HeartbeatTimeout {
1789 service_id: service_id.clone(),
1790 peer_id: PeerId::from("unknown".to_string()), };
1792
1793 if let Err(e) = health_event_tx.send(event) {
1794 debug!("Failed to send timeout event: {}", e);
1795 }
1796 }
1797 }
1798 }
1799 }
1800
1801 pub async fn get_stats(&self) -> MCPServerStats {
1803 self.stats.read().await.clone()
1804 }
1805
1806 pub async fn handle_heartbeat(&self, heartbeat: Heartbeat) -> Result<()> {
1808 let service_id = heartbeat.service_id.clone();
1809 let peer_id = heartbeat.peer_id.clone();
1810
1811 {
1813 let mut health_guard = self.service_health.write().await;
1814 let health = health_guard
1815 .entry(service_id.clone())
1816 .or_insert_with(|| ServiceHealth {
1817 service_id: service_id.clone(),
1818 status: ServiceHealthStatus::Healthy,
1819 last_health_check: None,
1820 last_heartbeat: None,
1821 failure_count: 0,
1822 success_count: 0,
1823 avg_response_time: Duration::from_millis(0),
1824 error_message: None,
1825 health_history: Vec::new(),
1826 });
1827
1828 health.last_heartbeat = Some(heartbeat.timestamp);
1829 health.status = ServiceHealthStatus::Healthy;
1830 health.failure_count = 0;
1831 health.error_message = None;
1832 }
1833
1834 let event = HealthEvent::HeartbeatReceived {
1836 service_id,
1837 peer_id,
1838 load: heartbeat.load,
1839 };
1840
1841 if let Err(e) = self.health_event_tx.send(event) {
1842 debug!("Failed to send heartbeat received event: {}", e);
1843 }
1844
1845 info!(
1846 "Heartbeat received from service: {} (load: {:.2})",
1847 heartbeat.service_id, heartbeat.load
1848 );
1849 Ok(())
1850 }
1851
1852 pub async fn get_service_health(&self, service_id: &str) -> Option<ServiceHealth> {
1854 let health_guard = self.service_health.read().await;
1855 health_guard.get(service_id).cloned()
1856 }
1857
1858 pub async fn get_all_service_health(&self) -> HashMap<String, ServiceHealth> {
1860 self.service_health.read().await.clone()
1861 }
1862
1863 pub async fn get_healthy_services(&self) -> Vec<String> {
1865 let health_guard = self.service_health.read().await;
1866 health_guard
1867 .iter()
1868 .filter(|(_, health)| health.status == ServiceHealthStatus::Healthy)
1869 .map(|(service_id, _)| service_id.clone())
1870 .collect()
1871 }
1872
1873 pub async fn update_service_health(
1875 &self,
1876 service_id: String,
1877 status: ServiceHealthStatus,
1878 error_message: Option<String>,
1879 ) {
1880 let mut health_guard = self.service_health.write().await;
1881 if let Some(health) = health_guard.get_mut(&service_id) {
1882 let previous_status = health.status;
1883 health.status = status;
1884 health.error_message = error_message.clone();
1885
1886 if previous_status != status {
1888 let event = match status {
1889 ServiceHealthStatus::Healthy => HealthEvent::ServiceHealthy {
1890 service_id: service_id.clone(),
1891 peer_id: PeerId::from("manual".to_string()),
1892 },
1893 ServiceHealthStatus::Unhealthy => HealthEvent::ServiceUnhealthy {
1894 service_id: service_id.clone(),
1895 peer_id: PeerId::from("manual".to_string()),
1896 error: error_message
1897 .unwrap_or_else(|| "Manually set to unhealthy".to_string()),
1898 },
1899 ServiceHealthStatus::Degraded => HealthEvent::ServiceDegraded {
1900 service_id: service_id.clone(),
1901 peer_id: PeerId::from("manual".to_string()),
1902 reason: "Manually set to degraded".to_string(),
1903 },
1904 _ => return,
1905 };
1906
1907 if let Err(e) = self.health_event_tx.send(event) {
1908 debug!("Failed to send manual health update event: {}", e);
1909 }
1910 }
1911 }
1912 }
1913
1914 pub fn subscribe_health_events(&self) -> mpsc::UnboundedReceiver<HealthEvent> {
1916 let (_tx, rx) = mpsc::unbounded_channel();
1919 rx
1920 }
1921
1922 pub async fn is_service_healthy(&self, service_id: &str) -> bool {
1924 if let Some(health) = self.get_service_health(service_id).await {
1925 health.status == ServiceHealthStatus::Healthy
1926 } else {
1927 false
1928 }
1929 }
1930
1931 pub async fn get_service_load_info(&self) -> HashMap<String, f32> {
1933 let health_guard = self.service_health.read().await;
1936 health_guard
1937 .iter()
1938 .map(|(service_id, health)| {
1939 let load = match health.status {
1940 ServiceHealthStatus::Healthy => 0.1,
1941 ServiceHealthStatus::Degraded => 0.7,
1942 ServiceHealthStatus::Unhealthy => 1.0,
1943 ServiceHealthStatus::Disabled => 0.0,
1944 ServiceHealthStatus::Unknown => 0.5,
1945 };
1946 (service_id.clone(), load)
1947 })
1948 .collect()
1949 }
1950
1951 pub async fn call_remote_tool(
1953 &self,
1954 peer_id: &PeerId,
1955 tool_name: &str,
1956 arguments: Value,
1957 context: MCPCallContext,
1958 ) -> Result<Value> {
1959 let request_id = uuid::Uuid::new_v4().to_string();
1960
1961 let mcp_message = MCPMessage::CallTool {
1963 name: tool_name.to_string(),
1964 arguments,
1965 };
1966
1967 let p2p_message = P2PMCPMessage {
1969 message_type: P2PMCPMessageType::Request,
1970 message_id: request_id.clone(),
1971 source_peer: context.caller_id.clone(),
1972 target_peer: Some(peer_id.clone()),
1973 timestamp: SystemTime::now()
1974 .duration_since(std::time::UNIX_EPOCH)
1975 .map_err(|e| {
1976 P2PError::Identity(crate::error::IdentityError::SystemTime(
1977 format!("Time error: {e}").into(),
1978 ))
1979 })?
1980 .as_secs(),
1981 payload: mcp_message,
1982 ttl: 5, };
1984
1985 let message_data = serde_json::to_vec(&p2p_message)
1987 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1988
1989 if message_data.len() > MAX_MESSAGE_SIZE {
1990 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1991 "Message too large".to_string().into(),
1992 )));
1993 }
1994
1995 let (response_tx, _response_rx) = oneshot::channel::<MCPResponse>();
1997
1998 {
2000 let mut handlers = self.request_handlers.write().await;
2001 handlers.insert(request_id.clone(), response_tx);
2002 }
2003
2004 if let Some(ref network_sender) = *self.network_sender.read().await {
2006 network_sender
2008 .send_message(peer_id, MCP_PROTOCOL, message_data)
2009 .await?;
2010
2011 debug!(
2014 "MCP remote tool call sent to peer {}, tool: {}",
2015 peer_id, tool_name
2016 );
2017
2018 Ok(json!({
2021 "status": "sent",
2022 "message": "Remote tool call sent successfully",
2023 "peer_id": peer_id,
2024 "tool_name": tool_name
2025 }))
2026 } else {
2027 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2028 "Network sender not configured".to_string().into(),
2029 )))
2030 }
2031 }
2032
2033 pub async fn handle_p2p_message(
2035 &self,
2036 message_data: &[u8],
2037 source_peer: &PeerId,
2038 ) -> Result<Option<Vec<u8>>> {
2039 let p2p_message: P2PMCPMessage = serde_json::from_slice(message_data)
2041 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2042
2043 debug!(
2044 "Received MCP message from {}: {:?}",
2045 source_peer, p2p_message.message_type
2046 );
2047
2048 if let MCPMessage::CallTool { name, arguments } = &p2p_message.payload {
2050 if name == "heartbeat" {
2051 if let Ok(heartbeat) = serde_json::from_value::<Heartbeat>(arguments.clone()) {
2052 self.handle_heartbeat(heartbeat).await?;
2053 return Ok(None);
2054 }
2055 } else if name == "health_check" {
2056 let health_response = MCPMessage::CallToolResult {
2058 content: vec![],
2059 is_error: false,
2060 };
2061
2062 let response_message = P2PMCPMessage {
2063 message_type: P2PMCPMessageType::Response,
2064 message_id: p2p_message.message_id.clone(),
2065 source_peer: source_peer.clone(),
2066 target_peer: Some(p2p_message.source_peer.clone()),
2067 timestamp: SystemTime::now()
2068 .duration_since(SystemTime::UNIX_EPOCH)
2069 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
2070 .as_secs(),
2071 payload: health_response,
2072 ttl: 3,
2073 };
2074
2075 return Ok(Some(serde_json::to_vec(&response_message)?));
2076 }
2077 }
2078
2079 match p2p_message.message_type {
2080 P2PMCPMessageType::Request => self.handle_remote_request(p2p_message).await,
2081 P2PMCPMessageType::Response => {
2082 self.handle_remote_response(p2p_message).await?;
2083 Ok(None) }
2085 P2PMCPMessageType::ServiceAdvertisement => {
2086 self.handle_service_advertisement(p2p_message).await?;
2087 Ok(None)
2088 }
2089 P2PMCPMessageType::ServiceDiscovery => self.handle_service_discovery(p2p_message).await,
2090 P2PMCPMessageType::Heartbeat => {
2091 debug!("Received heartbeat message");
2092 Ok(None)
2093 }
2094 P2PMCPMessageType::HealthCheck => {
2095 debug!("Received health check message");
2096 Ok(None)
2097 }
2098 }
2099 }
2100
2101 async fn handle_remote_request(&self, message: P2PMCPMessage) -> Result<Option<Vec<u8>>> {
2103 match message.payload {
2104 MCPMessage::CallTool { name, arguments } => {
2105 let context = MCPCallContext {
2106 caller_id: message.source_peer.clone(),
2107 timestamp: SystemTime::now(),
2108 timeout: DEFAULT_CALL_TIMEOUT,
2109 auth_info: None,
2110 metadata: HashMap::new(),
2111 };
2112
2113 let result = self.call_tool(&name, arguments, context).await;
2115
2116 let response_payload = match result {
2118 Ok(value) => MCPMessage::CallToolResult {
2119 content: vec![MCPContent::Text {
2120 text: value.to_string(),
2121 }],
2122 is_error: false,
2123 },
2124 Err(e) => MCPMessage::Error {
2125 code: -1,
2126 message: e.to_string(),
2127 data: None,
2128 },
2129 };
2130
2131 let response_message = P2PMCPMessage {
2132 message_type: P2PMCPMessageType::Response,
2133 message_id: message.message_id,
2134 source_peer: self.get_node_id_string(),
2135 target_peer: Some(message.source_peer),
2136 timestamp: SystemTime::now()
2137 .duration_since(std::time::UNIX_EPOCH)
2138 .map_err(|e| {
2139 P2PError::Identity(crate::error::IdentityError::SystemTime(
2140 format!("Time error: {e}").into(),
2141 ))
2142 })?
2143 .as_secs(),
2144 payload: response_payload,
2145 ttl: message.ttl.saturating_sub(1),
2146 };
2147
2148 let response_data = serde_json::to_vec(&response_message)
2150 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2151
2152 Ok(Some(response_data))
2153 }
2154 MCPMessage::ListTools { cursor: _ } => {
2155 let (tools, _) = self.list_tools(None).await?;
2156
2157 let response_payload = MCPMessage::ListToolsResult {
2158 tools,
2159 next_cursor: None,
2160 };
2161
2162 let response_message = P2PMCPMessage {
2163 message_type: P2PMCPMessageType::Response,
2164 message_id: message.message_id,
2165 source_peer: self.get_node_id_string(),
2166 target_peer: Some(message.source_peer),
2167 timestamp: SystemTime::now()
2168 .duration_since(std::time::UNIX_EPOCH)
2169 .map_err(|e| {
2170 P2PError::Identity(crate::error::IdentityError::SystemTime(
2171 format!("Time error: {e}").into(),
2172 ))
2173 })?
2174 .as_secs(),
2175 payload: response_payload,
2176 ttl: message.ttl.saturating_sub(1),
2177 };
2178
2179 let response_data = serde_json::to_vec(&response_message)
2180 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2181
2182 Ok(Some(response_data))
2183 }
2184 _ => {
2185 let error_response = P2PMCPMessage {
2187 message_type: P2PMCPMessageType::Response,
2188 message_id: message.message_id,
2189 source_peer: self.get_node_id_string(),
2190 target_peer: Some(message.source_peer),
2191 timestamp: SystemTime::now()
2192 .duration_since(std::time::UNIX_EPOCH)
2193 .map_err(|e| {
2194 P2PError::Identity(crate::error::IdentityError::SystemTime(
2195 format!("Time error: {e}").into(),
2196 ))
2197 })?
2198 .as_secs(),
2199 payload: MCPMessage::Error {
2200 code: -2,
2201 message: "Unsupported request type".to_string(),
2202 data: None,
2203 },
2204 ttl: message.ttl.saturating_sub(1),
2205 };
2206
2207 let response_data = serde_json::to_vec(&error_response)
2208 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2209
2210 Ok(Some(response_data))
2211 }
2212 }
2213 }
2214
2215 pub async fn generate_auth_token(
2219 &self,
2220 peer_id: &PeerId,
2221 permissions: Vec<MCPPermission>,
2222 ttl: Duration,
2223 ) -> Result<String> {
2224 if let Some(security_manager) = &self.security_manager {
2225 let token = security_manager
2226 .generate_token(peer_id, permissions, ttl)
2227 .await?;
2228
2229 let mut details = HashMap::new();
2231 details.insert("action".to_string(), "token_generated".to_string());
2232 details.insert("ttl_seconds".to_string(), ttl.as_secs().to_string());
2233
2234 self.audit_logger
2235 .log_event(
2236 "authentication".to_string(),
2237 peer_id.clone(),
2238 details,
2239 AuditSeverity::Info,
2240 )
2241 .await;
2242
2243 Ok(token)
2244 } else {
2245 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2246 "Authentication not enabled".to_string().into(),
2247 )))
2248 }
2249 }
2250
2251 pub async fn verify_auth_token(&self, token: &str) -> Result<TokenPayload> {
2253 if let Some(security_manager) = &self.security_manager {
2254 match security_manager.verify_token(token).await {
2255 Ok(payload) => {
2256 let mut details = HashMap::new();
2258 details.insert("action".to_string(), "token_verified".to_string());
2259 details.insert("subject".to_string(), payload.sub.clone());
2260
2261 self.audit_logger
2262 .log_event(
2263 "authentication".to_string(),
2264 payload.iss.clone(),
2265 details,
2266 AuditSeverity::Info,
2267 )
2268 .await;
2269
2270 Ok(payload)
2271 }
2272 Err(e) => {
2273 let mut details = HashMap::new();
2275 details.insert(
2276 "action".to_string(),
2277 "token_verification_failed".to_string(),
2278 );
2279 details.insert("error".to_string(), e.to_string());
2280
2281 self.audit_logger
2282 .log_event(
2283 "authentication".to_string(),
2284 "unknown".to_string(),
2285 details,
2286 AuditSeverity::Warning,
2287 )
2288 .await;
2289
2290 Err(e)
2291 }
2292 }
2293 } else {
2294 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2295 "Authentication not enabled".to_string().into(),
2296 )))
2297 }
2298 }
2299
2300 pub async fn check_permission(
2302 &self,
2303 peer_id: &PeerId,
2304 permission: &MCPPermission,
2305 ) -> Result<bool> {
2306 if let Some(security_manager) = &self.security_manager {
2307 security_manager.check_permission(peer_id, permission).await
2308 } else {
2309 Ok(true)
2311 }
2312 }
2313
2314 pub async fn check_rate_limit(&self, peer_id: &PeerId) -> Result<bool> {
2316 if let Some(security_manager) = &self.security_manager {
2317 let allowed = security_manager.check_rate_limit(peer_id).await?;
2318
2319 if !allowed {
2320 let mut details = HashMap::new();
2322 details.insert("action".to_string(), "rate_limit_exceeded".to_string());
2323
2324 self.audit_logger
2325 .log_event(
2326 "rate_limiting".to_string(),
2327 peer_id.clone(),
2328 details,
2329 AuditSeverity::Warning,
2330 )
2331 .await;
2332 }
2333
2334 Ok(allowed)
2335 } else {
2336 Ok(true)
2338 }
2339 }
2340
2341 pub async fn grant_permission(
2343 &self,
2344 peer_id: &PeerId,
2345 permission: MCPPermission,
2346 ) -> Result<()> {
2347 if let Some(security_manager) = &self.security_manager {
2348 security_manager
2349 .grant_permission(peer_id, permission.clone())
2350 .await?;
2351
2352 let mut details = HashMap::new();
2354 details.insert("action".to_string(), "permission_granted".to_string());
2355 details.insert("permission".to_string(), permission.as_str().to_string());
2356
2357 self.audit_logger
2358 .log_event(
2359 "authorization".to_string(),
2360 peer_id.clone(),
2361 details,
2362 AuditSeverity::Info,
2363 )
2364 .await;
2365
2366 Ok(())
2367 } else {
2368 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2369 "Security not enabled".to_string().into(),
2370 )))
2371 }
2372 }
2373
2374 pub async fn revoke_permission(
2376 &self,
2377 peer_id: &PeerId,
2378 permission: &MCPPermission,
2379 ) -> Result<()> {
2380 if let Some(security_manager) = &self.security_manager {
2381 security_manager
2382 .revoke_permission(peer_id, permission)
2383 .await?;
2384
2385 let mut details = HashMap::new();
2387 details.insert("action".to_string(), "permission_revoked".to_string());
2388 details.insert("permission".to_string(), permission.as_str().to_string());
2389
2390 self.audit_logger
2391 .log_event(
2392 "authorization".to_string(),
2393 peer_id.clone(),
2394 details,
2395 AuditSeverity::Info,
2396 )
2397 .await;
2398
2399 Ok(())
2400 } else {
2401 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2402 "Security not enabled".to_string().into(),
2403 )))
2404 }
2405 }
2406
2407 pub async fn add_trusted_peer(&self, peer_id: PeerId) -> Result<()> {
2409 if let Some(security_manager) = &self.security_manager {
2410 security_manager.add_trusted_peer(peer_id.clone()).await?;
2411
2412 let mut details = HashMap::new();
2414 details.insert("action".to_string(), "trusted_peer_added".to_string());
2415
2416 self.audit_logger
2417 .log_event(
2418 "trust_management".to_string(),
2419 peer_id,
2420 details,
2421 AuditSeverity::Info,
2422 )
2423 .await;
2424
2425 Ok(())
2426 } else {
2427 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2428 "Security not enabled".to_string().into(),
2429 )))
2430 }
2431 }
2432
2433 pub async fn is_trusted_peer(&self, peer_id: &PeerId) -> bool {
2435 if let Some(security_manager) = &self.security_manager {
2436 security_manager.is_trusted_peer(peer_id).await
2437 } else {
2438 false
2439 }
2440 }
2441
2442 pub async fn set_tool_security_policy(
2444 &self,
2445 tool_name: String,
2446 level: SecurityLevel,
2447 ) -> Result<()> {
2448 if let Some(security_manager) = &self.security_manager {
2449 security_manager
2450 .set_tool_policy(tool_name.clone(), level.clone())
2451 .await?;
2452
2453 let mut details = HashMap::new();
2455 details.insert("action".to_string(), "tool_policy_set".to_string());
2456 details.insert("tool_name".to_string(), tool_name);
2457 details.insert("security_level".to_string(), format!("{level:?}"));
2458
2459 self.audit_logger
2460 .log_event(
2461 "security_policy".to_string(),
2462 "system".to_string(),
2463 details,
2464 AuditSeverity::Info,
2465 )
2466 .await;
2467
2468 Ok(())
2469 } else {
2470 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2471 "Security not enabled".to_string().into(),
2472 )))
2473 }
2474 }
2475
2476 pub async fn get_tool_security_policy(&self, tool_name: &str) -> SecurityLevel {
2478 if let Some(security_manager) = &self.security_manager {
2479 security_manager.get_tool_policy(tool_name).await
2480 } else {
2481 SecurityLevel::Public
2482 }
2483 }
2484
2485 pub async fn get_peer_security_stats(&self, peer_id: &PeerId) -> Option<PeerACL> {
2487 if let Some(security_manager) = &self.security_manager {
2488 security_manager.get_peer_stats(peer_id).await
2489 } else {
2490 None
2491 }
2492 }
2493
2494 pub async fn get_security_audit(&self, limit: Option<usize>) -> Vec<SecurityAuditEntry> {
2496 self.audit_logger.get_recent_entries(limit).await
2497 }
2498
2499 pub async fn security_cleanup(&self) -> Result<()> {
2501 if let Some(security_manager) = &self.security_manager {
2502 security_manager.cleanup().await?;
2503 }
2504 Ok(())
2505 }
2506
2507 async fn handle_remote_response(&self, message: P2PMCPMessage) -> Result<()> {
2509 let response_tx = {
2511 let mut handlers = self.request_handlers.write().await;
2512 handlers.remove(&message.message_id)
2513 };
2514
2515 if let Some(tx) = response_tx {
2516 let response = MCPResponse {
2517 request_id: message.message_id,
2518 message: message.payload,
2519 timestamp: SystemTime::now(),
2520 processing_time: Duration::from_millis(0), };
2522
2523 let _ = tx.send(response);
2525 } else {
2526 debug!(
2527 "Received response for unknown request: {}",
2528 message.message_id
2529 );
2530 }
2531
2532 Ok(())
2533 }
2534
2535 pub async fn announce_local_services(&self) -> Result<()> {
2537 if let Some(dht) = &self.dht {
2538 let local_service = self.create_local_service_announcement().await?;
2540
2541 self.store_service_in_dht(&local_service, dht).await?;
2543
2544 if let Some(network_sender) = &*self.network_sender.read().await {
2546 self.broadcast_service_announcement(&local_service, network_sender)
2547 .await?;
2548 }
2549
2550 info!(
2551 "Announced local MCP service with {} tools",
2552 local_service.tools.len()
2553 );
2554 }
2555
2556 Ok(())
2557 }
2558
2559 async fn create_local_service_announcement(&self) -> Result<MCPService> {
2561 let tools = self.tools.read().await;
2562 let tool_names: Vec<String> = tools.keys().cloned().collect();
2563
2564 let service = MCPService {
2565 service_id: format!("mcp-{}", self.config.server_name),
2566 node_id: "local".to_string(), tools: tool_names,
2568 capabilities: MCPCapabilities {
2569 experimental: None,
2570 sampling: None,
2571 tools: Some(MCPToolsCapability {
2572 list_changed: Some(true),
2573 }),
2574 prompts: None,
2575 resources: None,
2576 logging: None,
2577 },
2578 metadata: MCPServiceMetadata {
2579 name: self.config.server_name.clone(),
2580 version: self.config.server_version.clone(),
2581 description: Some("P2P MCP Service".to_string()),
2582 tags: vec!["p2p".to_string(), "mcp".to_string()],
2583 health_status: ServiceHealthStatus::Healthy,
2584 load_metrics: self.get_current_load_metrics().await,
2585 },
2586 registered_at: SystemTime::now(),
2587 endpoint: MCPEndpoint {
2588 protocol: "p2p".to_string(),
2589 address: "local".to_string(), port: None,
2591 tls: true,
2592 auth_required: false,
2593 },
2594 };
2595
2596 Ok(service)
2597 }
2598
2599 async fn get_current_load_metrics(&self) -> ServiceLoadMetrics {
2601 let stats = self.stats.read().await;
2602
2603 ServiceLoadMetrics {
2604 active_requests: self.request_handlers.read().await.len() as u32,
2605 requests_per_second: stats.total_requests as f64 / 60.0, avg_response_time_ms: stats.avg_response_time.as_millis() as f64,
2607 error_rate: if stats.total_requests > 0 {
2608 stats.total_errors as f64 / stats.total_requests as f64
2609 } else {
2610 0.0
2611 },
2612 cpu_usage: 0.0, memory_usage: 0, }
2615 }
2616
2617 async fn store_service_in_dht(
2619 &self,
2620 service: &MCPService,
2621 dht: &Arc<RwLock<DHT>>,
2622 ) -> Result<()> {
2623 let hash = blake3::hash(format!("mcp:service:{}", service.service_id).as_bytes());
2625 let service_key: Key = *hash.as_bytes();
2626 let service_data = serde_json::to_vec(service)
2627 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2628
2629 let mut dht_guard = dht.write().await;
2630 dht_guard
2631 .store(&DhtKey::from_bytes(service_key), service_data)
2632 .await
2633 .map_err(|e| {
2634 P2PError::Dht(crate::error::DhtError::StoreFailed(
2635 format!("Failed to store service: {e}").into(),
2636 ))
2637 })?;
2638
2639 let hash = blake3::hash(b"mcp:services:index");
2641 let services_index_key: Key = *hash.as_bytes();
2642 let mut service_ids = match dht_guard
2643 .retrieve(&DhtKey::from_bytes(services_index_key))
2644 .await
2645 {
2646 Ok(Some(value)) => serde_json::from_slice::<Vec<String>>(&value).unwrap_or_default(),
2647 Ok(None) | Err(_) => Vec::new(),
2648 };
2649
2650 if !service_ids.contains(&service.service_id) {
2651 service_ids.push(service.service_id.clone());
2652
2653 let index_data = serde_json::to_vec(&service_ids)
2654 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2655
2656 dht_guard
2657 .store(&DhtKey::from_bytes(services_index_key), index_data)
2658 .await
2659 .map_err(|e| {
2660 P2PError::Dht(crate::error::DhtError::StoreFailed(
2661 format!("Failed to update services index: {e}").into(),
2662 ))
2663 })?;
2664 }
2665
2666 Ok(())
2667 }
2668
2669 async fn broadcast_service_announcement(
2671 &self,
2672 service: &MCPService,
2673 network_sender: &Arc<dyn NetworkSender>,
2674 ) -> Result<()> {
2675 let announcement = P2PMCPMessage {
2676 message_type: P2PMCPMessageType::ServiceAdvertisement,
2677 message_id: uuid::Uuid::new_v4().to_string(),
2678 source_peer: network_sender.local_peer_id().clone(),
2679 target_peer: None, timestamp: SystemTime::now()
2681 .duration_since(std::time::UNIX_EPOCH)
2682 .unwrap_or_default()
2683 .as_secs(),
2684 payload: MCPMessage::ListToolsResult {
2685 tools: service
2686 .tools
2687 .iter()
2688 .map(|tool_name| MCPTool {
2689 name: tool_name.clone(),
2690 description: format!("Tool from {}", service.metadata.name),
2691 input_schema: json!({"type": "object"}),
2692 })
2693 .collect(),
2694 next_cursor: None,
2695 },
2696 ttl: 3,
2697 };
2698
2699 let _announcement_data = serde_json::to_vec(&announcement)
2700 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2701
2702 debug!(
2705 "Service announcement prepared for broadcast: {} tools",
2706 service.tools.len()
2707 );
2708
2709 Ok(())
2710 }
2711
2712 pub async fn discover_remote_services(&self) -> Result<Vec<MCPService>> {
2714 if let Some(dht) = &self.dht {
2715 let hash = blake3::hash(b"mcp:services:index");
2716 let services_index_key: Key = *hash.as_bytes();
2717 let dht_guard = dht.read().await;
2718
2719 let service_ids = match dht_guard
2720 .retrieve(&DhtKey::from_bytes(services_index_key))
2721 .await
2722 {
2723 Ok(Some(value)) => {
2724 serde_json::from_slice::<Vec<String>>(&value).unwrap_or_default()
2725 }
2726 Ok(None) | Err(_) => {
2727 debug!("No services index found in DHT");
2728 return Ok(Vec::new());
2729 }
2730 };
2731
2732 let mut discovered_services = Vec::new();
2733
2734 for service_id in service_ids {
2735 let hash = blake3::hash(format!("mcp:service:{service_id}").as_bytes());
2736 let service_key: Key = *hash.as_bytes();
2737
2738 if let Ok(Some(value)) = dht_guard.retrieve(&DhtKey::from_bytes(service_key)).await
2739 {
2740 match serde_json::from_slice::<MCPService>(&value) {
2741 Ok(service) => {
2742 if service.service_id != format!("mcp-{}", self.config.server_name) {
2744 discovered_services.push(service);
2745 }
2746 }
2747 Err(e) => {
2748 warn!("Failed to deserialize service {}: {}", service_id, e);
2749 }
2750 }
2751 }
2752 }
2753
2754 debug!(
2755 "Discovered {} remote MCP services",
2756 discovered_services.len()
2757 );
2758 Ok(discovered_services)
2759 } else {
2760 Ok(Vec::new())
2761 }
2762 }
2763
2764 pub async fn refresh_service_discovery(&self) -> Result<()> {
2766 let discovered_services = self.discover_remote_services().await?;
2767
2768 {
2770 let mut remote_cache = self.remote_services.write().await;
2771 remote_cache.clear();
2772
2773 for service in discovered_services {
2774 remote_cache.insert(service.service_id.clone(), service);
2775 }
2776 }
2777
2778 {
2780 let local_service = self.create_local_service_announcement().await?;
2781 let mut local_cache = self.local_services.write().await;
2782 local_cache.insert(local_service.service_id.clone(), local_service);
2783 }
2784
2785 debug!("Service discovery refresh completed");
2786 Ok(())
2787 }
2788
2789 pub async fn get_all_services(&self) -> Result<Vec<MCPService>> {
2791 let mut all_services = Vec::new();
2792
2793 {
2795 let local_services = self.local_services.read().await;
2796 all_services.extend(local_services.values().cloned());
2797 }
2798
2799 {
2801 let remote_services = self.remote_services.read().await;
2802 all_services.extend(remote_services.values().cloned());
2803 }
2804
2805 Ok(all_services)
2806 }
2807
2808 pub async fn find_services_with_tool(&self, tool_name: &str) -> Result<Vec<MCPService>> {
2810 let all_services = self.get_all_services().await?;
2811
2812 let matching_services = all_services
2813 .into_iter()
2814 .filter(|service| service.tools.contains(&tool_name.to_string()))
2815 .collect();
2816
2817 Ok(matching_services)
2818 }
2819
2820 pub async fn handle_service_advertisement(&self, message: P2PMCPMessage) -> Result<()> {
2822 debug!(
2823 "Received service advertisement from peer: {}",
2824 message.source_peer
2825 );
2826
2827 if let MCPMessage::ListToolsResult { tools, .. } = message.payload {
2829 let service = MCPService {
2831 service_id: format!("mcp-{}", message.source_peer),
2832 node_id: message.source_peer.clone(),
2833 tools: tools.iter().map(|t| t.name.clone()).collect(),
2834 capabilities: MCPCapabilities {
2835 experimental: None,
2836 sampling: None,
2837 tools: Some(MCPToolsCapability {
2838 list_changed: Some(true),
2839 }),
2840 prompts: None,
2841 resources: None,
2842 logging: None,
2843 },
2844 metadata: MCPServiceMetadata {
2845 name: format!("Remote MCP Service - {}", message.source_peer),
2846 version: "unknown".to_string(),
2847 description: Some("Remote P2P MCP Service".to_string()),
2848 tags: vec!["p2p".to_string(), "remote".to_string()],
2849 health_status: ServiceHealthStatus::Healthy,
2850 load_metrics: ServiceLoadMetrics {
2851 active_requests: 0,
2852 requests_per_second: 0.0,
2853 avg_response_time_ms: 0.0,
2854 error_rate: 0.0,
2855 cpu_usage: 0.0,
2856 memory_usage: 0,
2857 },
2858 },
2859 registered_at: SystemTime::now(),
2860 endpoint: MCPEndpoint {
2861 protocol: "p2p".to_string(),
2862 address: message.source_peer.clone(),
2863 port: None,
2864 tls: true,
2865 auth_required: false,
2866 },
2867 };
2868
2869 {
2871 let mut remote_services = self.remote_services.write().await;
2872 remote_services.insert(service.service_id.clone(), service.clone());
2873 }
2874
2875 if let Some(dht) = &self.dht
2877 && let Err(e) = self.store_service_in_dht(&service, dht).await
2878 {
2879 warn!("Failed to store remote service in DHT: {}", e);
2880 }
2881
2882 info!(
2883 "Registered remote MCP service from {} with {} tools",
2884 message.source_peer,
2885 tools.len()
2886 );
2887 }
2888
2889 Ok(())
2890 }
2891
2892 pub async fn handle_service_discovery(
2894 &self,
2895 message: P2PMCPMessage,
2896 ) -> Result<Option<Vec<u8>>> {
2897 let local_services: Vec<MCPService> = {
2899 let services = self.local_services.read().await;
2900 services.values().cloned().collect()
2901 };
2902
2903 if !local_services.is_empty() {
2904 let advertisement = P2PMCPMessage {
2905 message_type: P2PMCPMessageType::ServiceAdvertisement,
2906 message_id: uuid::Uuid::new_v4().to_string(),
2907 source_peer: self.get_node_id_string(),
2908 target_peer: Some(message.source_peer),
2909 timestamp: SystemTime::now()
2910 .duration_since(std::time::UNIX_EPOCH)
2911 .map_err(|e| {
2912 P2PError::Identity(crate::error::IdentityError::SystemTime(
2913 format!("Time error: {e}").into(),
2914 ))
2915 })?
2916 .as_secs(),
2917 payload: MCPMessage::ListToolsResult {
2918 tools: local_services
2919 .into_iter()
2920 .flat_map(|s| {
2921 s.tools.into_iter().map(|t| MCPTool {
2922 name: t,
2923 description: "Remote tool".to_string(),
2924 input_schema: json!({"type": "object"}),
2925 })
2926 })
2927 .collect(),
2928 next_cursor: None,
2929 },
2930 ttl: message.ttl.saturating_sub(1),
2931 };
2932
2933 let response_data = serde_json::to_vec(&advertisement)
2934 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2935
2936 Ok(Some(response_data))
2937 } else {
2938 Ok(None)
2939 }
2940 }
2941
2942 pub async fn shutdown(&self) -> Result<()> {
2944 info!("Shutting down MCP server");
2945
2946 {
2948 let mut sessions = self.sessions.write().await;
2949 for session in sessions.values_mut() {
2950 session.state = MCPSessionState::Terminated;
2951 }
2952 sessions.clear();
2953 }
2954
2955 info!("MCP server shutdown complete");
2958 Ok(())
2959 }
2960}
2961
2962impl Tool {
2963 pub fn builder(name: &str, description: &str, input_schema: Value) -> ToolBuilder {
2965 ToolBuilder {
2966 name: name.to_string(),
2967 description: description.to_string(),
2968 input_schema,
2969 handler: None,
2970 tags: Vec::new(),
2971 }
2972 }
2973}
2974
2975pub struct ToolBuilder {
2977 name: String,
2978 description: String,
2979 input_schema: Value,
2980 handler: Option<Box<dyn ToolHandler + Send + Sync>>,
2981 tags: Vec<String>,
2982}
2983
2984impl ToolBuilder {
2985 pub fn handler<H: ToolHandler + Send + Sync + 'static>(mut self, handler: H) -> Self {
2987 self.handler = Some(Box::new(handler));
2988 self
2989 }
2990
2991 pub fn tags(mut self, tags: Vec<String>) -> Self {
2993 self.tags = tags;
2994 self
2995 }
2996
2997 pub fn build(self) -> Result<Tool> {
2999 let handler = self.handler.ok_or_else(|| {
3000 P2PError::Mcp(crate::error::McpError::InvalidRequest(
3001 "Tool handler is required".to_string().into(),
3002 ))
3003 })?;
3004
3005 let definition = MCPTool {
3006 name: self.name,
3007 description: self.description,
3008 input_schema: self.input_schema,
3009 };
3010
3011 let metadata = ToolMetadata {
3012 created_at: SystemTime::now(),
3013 last_called: None,
3014 call_count: 0,
3015 avg_execution_time: Duration::from_millis(0),
3016 health_status: ToolHealthStatus::Healthy,
3017 tags: self.tags,
3018 };
3019
3020 Ok(Tool {
3021 definition,
3022 handler,
3023 metadata,
3024 })
3025 }
3026}
3027
3028pub struct FunctionToolHandler<F> {
3030 function: F,
3031}
3032
3033impl<F, Fut> ToolHandler for FunctionToolHandler<F>
3034where
3035 F: Fn(Value) -> Fut + Send + Sync,
3036 Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
3037{
3038 fn execute(
3039 &self,
3040 arguments: Value,
3041 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>> {
3042 Box::pin((self.function)(arguments))
3043 }
3044}
3045
3046impl<F> FunctionToolHandler<F> {
3047 pub fn new(function: F) -> Self {
3049 Self { function }
3050 }
3051}
3052
3053impl MCPService {
3055 pub fn new(service_id: String, node_id: PeerId) -> Self {
3057 Self {
3058 service_id,
3059 node_id,
3060 tools: Vec::new(),
3061 capabilities: MCPCapabilities {
3062 experimental: None,
3063 sampling: None,
3064 tools: Some(MCPToolsCapability {
3065 list_changed: Some(true),
3066 }),
3067 prompts: None,
3068 resources: None,
3069 logging: None,
3070 },
3071 metadata: MCPServiceMetadata {
3072 name: "MCP Service".to_string(),
3073 version: "1.0.0".to_string(),
3074 description: None,
3075 tags: Vec::new(),
3076 health_status: ServiceHealthStatus::Healthy,
3077 load_metrics: ServiceLoadMetrics {
3078 active_requests: 0,
3079 requests_per_second: 0.0,
3080 avg_response_time_ms: 0.0,
3081 error_rate: 0.0,
3082 cpu_usage: 0.0,
3083 memory_usage: 0,
3084 },
3085 },
3086 registered_at: SystemTime::now(),
3087 endpoint: MCPEndpoint {
3088 protocol: "p2p".to_string(),
3089 address: "".to_string(),
3090 port: None,
3091 tls: false,
3092 auth_required: false,
3093 },
3094 }
3095 }
3096}
3097
3098impl Default for MCPCapabilities {
3099 fn default() -> Self {
3100 Self {
3101 experimental: None,
3102 sampling: None,
3103 tools: Some(MCPToolsCapability {
3104 list_changed: Some(true),
3105 }),
3106 prompts: Some(MCPPromptsCapability {
3107 list_changed: Some(true),
3108 }),
3109 resources: Some(MCPResourcesCapability {
3110 subscribe: Some(true),
3111 list_changed: Some(true),
3112 }),
3113 logging: Some(MCPLoggingCapability {
3114 levels: Some(vec![
3115 MCPLogLevel::Debug,
3116 MCPLogLevel::Info,
3117 MCPLogLevel::Warning,
3118 MCPLogLevel::Error,
3119 ]),
3120 }),
3121 }
3122 }
3123}
3124
3125#[cfg(test)]
3126mod tests {
3127 use super::*;
3128 use crate::dht::DHT;
3129 use std::future::Future;
3130 use std::pin::Pin;
3131 use std::time::UNIX_EPOCH;
3132 use tokio::time::timeout;
3133
3134 struct TestTool {
3136 name: String,
3137 should_error: bool,
3138 execution_time: Duration,
3139 }
3140
3141 impl TestTool {
3142 fn new(name: &str) -> Self {
3143 Self {
3144 name: name.to_string(),
3145 should_error: false,
3146 execution_time: Duration::from_millis(10),
3147 }
3148 }
3149
3150 fn with_error(mut self) -> Self {
3151 self.should_error = true;
3152 self
3153 }
3154
3155 fn with_execution_time(mut self, duration: Duration) -> Self {
3156 self.execution_time = duration;
3157 self
3158 }
3159 }
3160
3161 impl ToolHandler for TestTool {
3162 fn execute(
3163 &self,
3164 arguments: Value,
3165 ) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>> {
3166 let should_error = self.should_error;
3167 let execution_time = self.execution_time;
3168 let name = self.name.clone();
3169
3170 Box::pin(async move {
3171 tokio::time::sleep(execution_time).await;
3172
3173 if should_error {
3174 return Err(P2PError::Mcp(crate::error::McpError::ToolExecutionFailed(
3175 format!("{}: Test error", name).into(),
3176 )));
3177 }
3178
3179 Ok(json!({
3181 "tool": name,
3182 "arguments": arguments,
3183 "result": "success"
3184 }))
3185 })
3186 }
3187
3188 fn validate(&self, arguments: &Value) -> Result<()> {
3189 if !arguments.is_object() {
3190 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
3191 "Arguments must be an object".to_string().into(),
3192 )));
3193 }
3194 Ok(())
3195 }
3196
3197 fn get_requirements(&self) -> ToolRequirements {
3198 ToolRequirements {
3199 max_memory: Some(1024 * 1024), max_execution_time: Some(Duration::from_secs(5)),
3201 required_capabilities: vec!["test".to_string()],
3202 requires_network: false,
3203 requires_filesystem: false,
3204 }
3205 }
3206 }
3207
3208 async fn create_test_mcp_server() -> MCPServer {
3210 let config = MCPServerConfig {
3211 server_name: "test_server".to_string(),
3212 server_version: "1.0.0".to_string(),
3213 enable_auth: false,
3214 enable_rate_limiting: false,
3215 max_concurrent_requests: 10,
3216 request_timeout: Duration::from_secs(30),
3217 enable_dht_discovery: true,
3218 rate_limit_rpm: 60,
3219 enable_logging: true,
3220 max_tool_execution_time: Duration::from_secs(30),
3221 tool_memory_limit: 100 * 1024 * 1024,
3222 health_monitor: HealthMonitorConfig::default(),
3223 };
3224
3225 MCPServer::new(config)
3226 }
3227
3228 fn create_test_tool(name: &str) -> Tool {
3230 Tool {
3231 definition: MCPTool {
3232 name: name.to_string(),
3233 description: format!("Test tool: {}", name).into(),
3234 input_schema: json!({
3235 "type": "object",
3236 "properties": {
3237 "input": { "type": "string" }
3238 }
3239 }),
3240 },
3241 handler: Box::new(TestTool::new(name)),
3242 metadata: ToolMetadata {
3243 created_at: SystemTime::now(),
3244 last_called: None,
3245 call_count: 0,
3246 avg_execution_time: Duration::from_millis(0),
3247 health_status: ToolHealthStatus::Healthy,
3248 tags: vec!["test".to_string()],
3249 },
3250 }
3251 }
3252
3253 async fn create_test_dht() -> DHT {
3255 use crate::dht::core_engine::NodeId;
3256 let hash = blake3::hash(b"test_node_id");
3257 let local_id = NodeId::from_bytes(*hash.as_bytes());
3258 DHT::new(local_id).expect("Failed to create test DHT")
3259 }
3260
3261 fn create_test_context(caller_id: PeerId) -> MCPCallContext {
3263 MCPCallContext {
3264 caller_id,
3265 timestamp: SystemTime::now(),
3266 timeout: Duration::from_secs(30),
3267 auth_info: None,
3268 metadata: HashMap::new(),
3269 }
3270 }
3271
3272 #[tokio::test]
3273 async fn test_mcp_server_creation() {
3274 let server = create_test_mcp_server().await;
3275 assert_eq!(server.config.server_name, "test_server");
3276 assert_eq!(server.config.server_version, "1.0.0");
3277 assert!(!server.config.enable_auth);
3278 assert!(!server.config.enable_rate_limiting);
3279 }
3280
3281 #[tokio::test]
3282 async fn test_tool_registration() -> Result<()> {
3283 let server = create_test_mcp_server().await;
3284 let tool = create_test_tool("test_calculator");
3285
3286 server.register_tool(tool).await?;
3288
3289 let tools = server.tools.read().await;
3291 assert!(tools.contains_key("test_calculator"));
3292 assert_eq!(
3293 tools
3294 .get("test_calculator")
3295 .expect("Should succeed in test")
3296 .definition
3297 .name,
3298 "test_calculator"
3299 );
3300
3301 let stats = server.stats.read().await;
3303 assert_eq!(stats.total_tools, 1);
3304
3305 Ok(())
3306 }
3307
3308 #[tokio::test]
3309 async fn test_tool_registration_duplicate() -> Result<()> {
3310 let server = create_test_mcp_server().await;
3311 let tool1 = create_test_tool("duplicate_tool");
3312 let tool2 = create_test_tool("duplicate_tool");
3313
3314 server.register_tool(tool1).await?;
3316
3317 let result = server.register_tool(tool2).await;
3319 assert!(result.is_err());
3320 assert!(
3321 result
3322 .unwrap_err()
3323 .to_string()
3324 .contains("Tool already exists")
3325 );
3326
3327 Ok(())
3328 }
3329
3330 #[tokio::test]
3331 async fn test_tool_validation() {
3332 let server = create_test_mcp_server().await;
3333
3334 let mut invalid_tool = create_test_tool("");
3336 let result = server.validate_tool(&invalid_tool).await;
3337 assert!(result.is_err());
3338
3339 invalid_tool.definition.name = "a".repeat(200);
3341 let result = server.validate_tool(&invalid_tool).await;
3342 assert!(result.is_err());
3343
3344 let mut invalid_schema_tool = create_test_tool("valid_name");
3346 invalid_schema_tool.definition.input_schema = json!("not an object");
3347 let result = server.validate_tool(&invalid_schema_tool).await;
3348 assert!(result.is_err());
3349
3350 let valid_tool = create_test_tool("valid_tool");
3352 let result = server.validate_tool(&valid_tool).await;
3353 assert!(result.is_ok());
3354 }
3355
3356 #[tokio::test]
3357 async fn test_tool_call_success() -> Result<()> {
3358 let server = create_test_mcp_server().await;
3359 let tool = create_test_tool("success_tool");
3360 server.register_tool(tool).await?;
3361
3362 let caller_id = "test_peer_123".to_string();
3363 let context = create_test_context(caller_id);
3364 let arguments = json!({"input": "test data"});
3365
3366 let result = server
3367 .call_tool("success_tool", arguments.clone(), context)
3368 .await?;
3369
3370 assert_eq!(result["tool"], "success_tool");
3372 assert_eq!(result["arguments"], arguments);
3373 assert_eq!(result["result"], "success");
3374
3375 let tools = server.tools.read().await;
3377 let tool_metadata = &tools
3378 .get("success_tool")
3379 .ok_or_else(|| {
3380 P2PError::Mcp(crate::error::McpError::ToolNotFound(
3381 "Tool not found".into(),
3382 ))
3383 })?
3384 .metadata;
3385 assert_eq!(tool_metadata.call_count, 1);
3386 assert!(tool_metadata.last_called.is_some());
3387
3388 Ok(())
3389 }
3390
3391 #[tokio::test]
3392 async fn test_tool_call_nonexistent() -> Result<()> {
3393 let server = create_test_mcp_server().await;
3394 let caller_id = "test_peer_456".to_string();
3395 let context = create_test_context(caller_id);
3396 let arguments = json!({"input": "test"});
3397
3398 let result = server
3399 .call_tool("nonexistent_tool", arguments, context)
3400 .await;
3401 assert!(result.is_err());
3402 assert!(result.unwrap_err().to_string().contains("Tool not found"));
3403
3404 Ok(())
3405 }
3406
3407 #[tokio::test]
3408 async fn test_tool_call_handler_error() -> Result<()> {
3409 let server = create_test_mcp_server().await;
3410 let tool = Tool {
3411 definition: MCPTool {
3412 name: "error_tool".to_string(),
3413 description: "Tool that always errors".to_string(),
3414 input_schema: json!({"type": "object"}),
3415 },
3416 handler: Box::new(TestTool::new("error_tool").with_error()),
3417 metadata: ToolMetadata {
3418 created_at: SystemTime::now(),
3419 last_called: None,
3420 call_count: 0,
3421 avg_execution_time: Duration::from_millis(0),
3422 health_status: ToolHealthStatus::Healthy,
3423 tags: vec![],
3424 },
3425 };
3426
3427 server.register_tool(tool).await?;
3428
3429 let caller_id = "test_peer_error".to_string();
3430 let context = create_test_context(caller_id);
3431 let arguments = json!({"input": "test"});
3432
3433 let result = server.call_tool("error_tool", arguments, context).await;
3434 assert!(result.is_err());
3435 assert!(
3436 result
3437 .unwrap_err()
3438 .to_string()
3439 .contains("Test error from tool error_tool")
3440 );
3441
3442 Ok(())
3443 }
3444
3445 #[tokio::test]
3446 async fn test_tool_call_timeout() -> Result<()> {
3447 let server = create_test_mcp_server().await;
3448 let slow_tool = Tool {
3449 definition: MCPTool {
3450 name: "slow_tool".to_string(),
3451 description: "Tool that takes too long".to_string(),
3452 input_schema: json!({"type": "object"}),
3453 },
3454 handler: Box::new(
3455 TestTool::new("slow_tool").with_execution_time(Duration::from_secs(2)),
3456 ),
3457 metadata: ToolMetadata {
3458 created_at: SystemTime::now(),
3459 last_called: None,
3460 call_count: 0,
3461 avg_execution_time: Duration::from_millis(0),
3462 health_status: ToolHealthStatus::Healthy,
3463 tags: vec![],
3464 },
3465 };
3466
3467 server.register_tool(slow_tool).await?;
3468
3469 let caller_id = "test_peer_error".to_string();
3470 let context = create_test_context(caller_id);
3471 let arguments = json!({"input": "test"});
3472
3473 let result = timeout(
3475 Duration::from_millis(100),
3476 server.call_tool("slow_tool", arguments, context),
3477 )
3478 .await;
3479
3480 assert!(result.is_err()); Ok(())
3483 }
3484
3485 #[tokio::test]
3486 async fn test_tool_requirements() {
3487 let tool = TestTool::new("req_tool");
3488 let requirements = tool.get_requirements();
3489
3490 assert_eq!(requirements.max_memory, Some(1024 * 1024));
3491 assert_eq!(
3492 requirements.max_execution_time,
3493 Some(Duration::from_secs(5))
3494 );
3495 assert_eq!(requirements.required_capabilities, vec!["test"]);
3496 assert!(!requirements.requires_network);
3497 assert!(!requirements.requires_filesystem);
3498 }
3499
3500 #[tokio::test]
3501 async fn test_tool_validation_handler() {
3502 let tool = TestTool::new("validation_tool");
3503
3504 let valid_args = json!({"key": "value"});
3506 assert!(tool.validate(&valid_args).is_ok());
3507
3508 let invalid_args = json!("not an object");
3510 assert!(tool.validate(&invalid_args).is_err());
3511
3512 let invalid_args = json!(123);
3513 assert!(tool.validate(&invalid_args).is_err());
3514 }
3515
3516 #[tokio::test]
3517 async fn test_tool_health_status() {
3518 let mut metadata = ToolMetadata {
3519 created_at: SystemTime::now(),
3520 last_called: None,
3521 call_count: 0,
3522 avg_execution_time: Duration::from_millis(0),
3523 health_status: ToolHealthStatus::Healthy,
3524 tags: vec![],
3525 };
3526
3527 assert_eq!(metadata.health_status, ToolHealthStatus::Healthy);
3529
3530 metadata.health_status = ToolHealthStatus::Degraded;
3531 assert_eq!(metadata.health_status, ToolHealthStatus::Degraded);
3532
3533 metadata.health_status = ToolHealthStatus::Unhealthy;
3534 assert_eq!(metadata.health_status, ToolHealthStatus::Unhealthy);
3535
3536 metadata.health_status = ToolHealthStatus::Disabled;
3537 assert_eq!(metadata.health_status, ToolHealthStatus::Disabled);
3538 }
3539
3540 #[tokio::test]
3541 async fn test_mcp_capabilities() {
3542 let server = create_test_mcp_server().await;
3543 let capabilities = server.get_server_capabilities().await;
3544
3545 assert!(capabilities.tools.is_some());
3546 assert!(capabilities.prompts.is_some());
3547 assert!(capabilities.resources.is_some());
3548 assert!(capabilities.logging.is_some());
3549
3550 let tools_cap = capabilities.tools.expect("Test assertion failed");
3551 assert_eq!(tools_cap.list_changed, Some(true));
3552
3553 let logging_cap = capabilities.logging.expect("Test assertion failed");
3554 let levels = logging_cap.levels.expect("Test assertion failed");
3555 assert!(levels.contains(&MCPLogLevel::Debug));
3556 assert!(levels.contains(&MCPLogLevel::Info));
3557 assert!(levels.contains(&MCPLogLevel::Warning));
3558 assert!(levels.contains(&MCPLogLevel::Error));
3559 }
3560
3561 #[tokio::test]
3562 async fn test_mcp_message_serialization() {
3563 let init_msg = MCPMessage::Initialize {
3565 protocol_version: MCP_VERSION.to_string(),
3566 capabilities: MCPCapabilities {
3567 experimental: None,
3568 sampling: None,
3569 tools: Some(MCPToolsCapability {
3570 list_changed: Some(true),
3571 }),
3572 prompts: None,
3573 resources: None,
3574 logging: None,
3575 },
3576 client_info: MCPClientInfo {
3577 name: "test_client".to_string(),
3578 version: "1.0.0".to_string(),
3579 },
3580 };
3581
3582 let serialized = serde_json::to_string(&init_msg).expect("Test assertion failed");
3583 let deserialized: MCPMessage =
3584 serde_json::from_str(&serialized).expect("Test assertion failed");
3585
3586 match deserialized {
3587 MCPMessage::Initialize {
3588 protocol_version,
3589 client_info,
3590 ..
3591 } => {
3592 assert_eq!(protocol_version, MCP_VERSION);
3593 assert_eq!(client_info.name, "test_client");
3594 assert_eq!(client_info.version, "1.0.0");
3595 }
3596 _ => panic!("Wrong message type after deserialization"),
3597 }
3598 }
3599
3600 #[tokio::test]
3601 async fn test_mcp_content_types() {
3602 let text_content = MCPContent::Text {
3604 text: "Hello, world!".to_string(),
3605 };
3606
3607 let serialized = serde_json::to_string(&text_content).expect("Test assertion failed");
3608 let deserialized: MCPContent =
3609 serde_json::from_str(&serialized).expect("Test assertion failed");
3610
3611 match deserialized {
3612 MCPContent::Text { text } => assert_eq!(text, "Hello, world!"),
3613 _ => panic!("Wrong content type"),
3614 }
3615
3616 let image_content = MCPContent::Image {
3618 data: "base64data".to_string(),
3619 mime_type: "image/png".to_string(),
3620 };
3621
3622 let serialized = serde_json::to_string(&image_content).expect("Test assertion failed");
3623 let deserialized: MCPContent =
3624 serde_json::from_str(&serialized).expect("Test assertion failed");
3625
3626 match deserialized {
3627 MCPContent::Image { data, mime_type } => {
3628 assert_eq!(data, "base64data");
3629 assert_eq!(mime_type, "image/png");
3630 }
3631 _ => panic!("Wrong content type"),
3632 }
3633 }
3634
3635 #[tokio::test]
3636 async fn test_service_health_status() {
3637 let mut metrics = ServiceLoadMetrics {
3638 active_requests: 0,
3639 requests_per_second: 0.0,
3640 avg_response_time_ms: 0.0,
3641 error_rate: 0.0,
3642 cpu_usage: 0.0,
3643 memory_usage: 0,
3644 };
3645
3646 let metadata = MCPServiceMetadata {
3648 name: "test_service".to_string(),
3649 version: "1.0.0".to_string(),
3650 description: Some("Test service".to_string()),
3651 tags: vec!["test".to_string()],
3652 health_status: ServiceHealthStatus::Healthy,
3653 load_metrics: metrics.clone(),
3654 };
3655
3656 assert_eq!(metadata.health_status, ServiceHealthStatus::Healthy);
3657
3658 metrics.error_rate = 0.5; let degraded_metadata = MCPServiceMetadata {
3661 health_status: ServiceHealthStatus::Degraded,
3662 load_metrics: metrics.clone(),
3663 ..metadata.clone()
3664 };
3665
3666 assert_eq!(
3667 degraded_metadata.health_status,
3668 ServiceHealthStatus::Degraded
3669 );
3670
3671 let unhealthy_metadata = MCPServiceMetadata {
3672 health_status: ServiceHealthStatus::Unhealthy,
3673 ..metadata.clone()
3674 };
3675
3676 assert_eq!(
3677 unhealthy_metadata.health_status,
3678 ServiceHealthStatus::Unhealthy
3679 );
3680 }
3681
3682 #[tokio::test]
3683 async fn test_p2p_mcp_message() {
3684 let source_peer = "source_peer_123".to_string();
3685 let target_peer = "target_peer_456".to_string();
3686
3687 let p2p_message = P2PMCPMessage {
3688 message_type: P2PMCPMessageType::Request,
3689 message_id: uuid::Uuid::new_v4().to_string(),
3690 source_peer: source_peer.clone(),
3691 target_peer: Some(target_peer.clone()),
3692 timestamp: SystemTime::now()
3693 .duration_since(UNIX_EPOCH)
3694 .unwrap_or_else(|_| Duration::from_secs(0))
3695 .as_secs(),
3696 payload: MCPMessage::ListTools { cursor: None },
3697 ttl: 10,
3698 };
3699
3700 let serialized = serde_json::to_string(&p2p_message).expect("Test assertion failed");
3702 let deserialized: P2PMCPMessage =
3703 serde_json::from_str(&serialized).expect("Test assertion failed");
3704
3705 assert_eq!(deserialized.message_type, P2PMCPMessageType::Request);
3706 assert_eq!(deserialized.source_peer, source_peer);
3707 assert_eq!(deserialized.target_peer, Some(target_peer));
3708 assert_eq!(deserialized.ttl, 10);
3709
3710 match deserialized.payload {
3711 MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
3712 _ => panic!("Wrong message payload type"),
3713 }
3714 }
3715
3716 #[tokio::test]
3717 async fn test_tool_requirements_default() {
3718 let default_requirements = ToolRequirements::default();
3719
3720 assert_eq!(default_requirements.max_memory, Some(100 * 1024 * 1024));
3721 assert_eq!(
3722 default_requirements.max_execution_time,
3723 Some(Duration::from_secs(30))
3724 );
3725 assert!(default_requirements.required_capabilities.is_empty());
3726 assert!(!default_requirements.requires_network);
3727 assert!(!default_requirements.requires_filesystem);
3728 }
3729
3730 #[tokio::test]
3731 async fn test_mcp_server_stats() -> Result<()> {
3732 let server = create_test_mcp_server().await;
3733
3734 let stats = server.stats.read().await;
3736 assert_eq!(stats.total_tools, 0);
3737 assert_eq!(stats.total_requests, 0);
3738 assert_eq!(stats.total_responses, 0);
3739 assert_eq!(stats.total_errors, 0);
3740
3741 drop(stats);
3742
3743 let tool = create_test_tool("stats_test_tool");
3745 server.register_tool(tool).await?;
3746
3747 let stats = server.stats.read().await;
3748 assert_eq!(stats.total_tools, 1);
3749
3750 Ok(())
3751 }
3752
3753 #[tokio::test]
3754 async fn test_log_levels() {
3755 let levels = vec![
3757 MCPLogLevel::Debug,
3758 MCPLogLevel::Info,
3759 MCPLogLevel::Notice,
3760 MCPLogLevel::Warning,
3761 MCPLogLevel::Error,
3762 MCPLogLevel::Critical,
3763 MCPLogLevel::Alert,
3764 MCPLogLevel::Emergency,
3765 ];
3766
3767 for level in levels {
3768 let serialized = serde_json::to_string(&level).expect("Test assertion failed");
3769 let deserialized: MCPLogLevel =
3770 serde_json::from_str(&serialized).expect("Test assertion failed");
3771 assert_eq!(level as u8, deserialized as u8);
3772 }
3773 }
3774
3775 #[tokio::test]
3776 async fn test_mcp_endpoint() {
3777 let endpoint = MCPEndpoint {
3778 protocol: "p2p".to_string(),
3779 address: "127.0.0.1".to_string(),
3780 port: Some(9000),
3781 tls: true,
3782 auth_required: true,
3783 };
3784
3785 let serialized = serde_json::to_string(&endpoint).expect("Test assertion failed");
3786 let deserialized: MCPEndpoint =
3787 serde_json::from_str(&serialized).expect("Test assertion failed");
3788
3789 assert_eq!(deserialized.protocol, "p2p");
3790 assert_eq!(deserialized.address, "127.0.0.1");
3791 assert_eq!(deserialized.port, Some(9000));
3792 assert!(deserialized.tls);
3793 assert!(deserialized.auth_required);
3794 }
3795
3796 #[tokio::test]
3797 async fn test_mcp_service_metadata() {
3798 let load_metrics = ServiceLoadMetrics {
3799 active_requests: 5,
3800 requests_per_second: 10.5,
3801 avg_response_time_ms: 250.0,
3802 error_rate: 0.01,
3803 cpu_usage: 45.5,
3804 memory_usage: 1024 * 1024 * 100, };
3806
3807 let metadata = MCPServiceMetadata {
3808 name: "test_service".to_string(),
3809 version: "2.1.0".to_string(),
3810 description: Some("A test service for unit testing".to_string()),
3811 tags: vec!["test".to_string(), "unit".to_string(), "mcp".to_string()],
3812 health_status: ServiceHealthStatus::Healthy,
3813 load_metrics,
3814 };
3815
3816 let serialized = serde_json::to_string(&metadata).expect("Test assertion failed");
3818 let deserialized: MCPServiceMetadata =
3819 serde_json::from_str(&serialized).expect("Test assertion failed");
3820
3821 assert_eq!(deserialized.name, "test_service");
3822 assert_eq!(deserialized.version, "2.1.0");
3823 assert_eq!(
3824 deserialized.description,
3825 Some("A test service for unit testing".to_string())
3826 );
3827 assert_eq!(deserialized.tags, vec!["test", "unit", "mcp"]);
3828 assert_eq!(deserialized.health_status, ServiceHealthStatus::Healthy);
3829 assert_eq!(deserialized.load_metrics.active_requests, 5);
3830 assert_eq!(deserialized.load_metrics.requests_per_second, 10.5);
3831 }
3832
3833 #[tokio::test]
3834 async fn test_function_tool_handler() -> Result<()> {
3835 let handler = FunctionToolHandler::new(|args: Value| async move {
3837 let name = args.get("name").and_then(|v| v.as_str()).unwrap_or("world");
3838 Ok(json!({"greeting": format!("Hello, {}!", name)}))
3839 });
3840
3841 let args = json!({"name": "Alice"});
3842 let result = handler.execute(args).await?;
3843 assert_eq!(result["greeting"], "Hello, Alice!");
3844
3845 let empty_args = json!({});
3847 let result = handler.execute(empty_args).await?;
3848 assert_eq!(result["greeting"], "Hello, world!");
3849
3850 Ok(())
3851 }
3852
3853 #[tokio::test]
3854 async fn test_mcp_service_creation() {
3855 let service_id = "test_service_123".to_string();
3856 let node_id = "test_node_789".to_string();
3857
3858 let service = MCPService::new(service_id.clone(), node_id.clone());
3859
3860 assert_eq!(service.service_id, service_id);
3861 assert_eq!(service.node_id, node_id);
3862 assert!(service.tools.is_empty());
3863 assert_eq!(service.metadata.name, "MCP Service");
3864 assert_eq!(service.metadata.version, "1.0.0");
3865 assert_eq!(service.metadata.health_status, ServiceHealthStatus::Healthy);
3866 assert_eq!(service.endpoint.protocol, "p2p");
3867 assert!(!service.endpoint.tls);
3868 assert!(!service.endpoint.auth_required);
3869 }
3870
3871 #[tokio::test]
3872 async fn test_mcp_capabilities_default() {
3873 let capabilities = MCPCapabilities::default();
3874
3875 assert!(capabilities.tools.is_some());
3876 assert!(capabilities.prompts.is_some());
3877 assert!(capabilities.resources.is_some());
3878 assert!(capabilities.logging.is_some());
3879
3880 let tools_cap = capabilities.tools.expect("Test assertion failed");
3881 assert_eq!(tools_cap.list_changed, Some(true));
3882
3883 let resources_cap = capabilities.resources.expect("Test assertion failed");
3884 assert_eq!(resources_cap.subscribe, Some(true));
3885 assert_eq!(resources_cap.list_changed, Some(true));
3886
3887 let logging_cap = capabilities.logging.expect("Test assertion failed");
3888 let levels = logging_cap.levels.expect("Test assertion failed");
3889 assert!(levels.contains(&MCPLogLevel::Debug));
3890 assert!(levels.contains(&MCPLogLevel::Info));
3891 assert!(levels.contains(&MCPLogLevel::Warning));
3892 assert!(levels.contains(&MCPLogLevel::Error));
3893 }
3894
3895 #[tokio::test]
3896 async fn test_mcp_request_creation() {
3897 let source_peer = "source_peer_123".to_string();
3898 let target_peer = "target_peer_456".to_string();
3899
3900 let request = MCPRequest {
3901 request_id: uuid::Uuid::new_v4().to_string(),
3902 source_peer: source_peer.clone(),
3903 target_peer: target_peer.clone(),
3904 message: MCPMessage::ListTools { cursor: None },
3905 timestamp: SystemTime::now(),
3906 timeout: Duration::from_secs(30),
3907 auth_token: Some("test_token".to_string()),
3908 };
3909
3910 assert_eq!(request.source_peer, source_peer);
3911 assert_eq!(request.target_peer, target_peer);
3912 assert_eq!(request.timeout, Duration::from_secs(30));
3913 assert_eq!(request.auth_token, Some("test_token".to_string()));
3914
3915 match request.message {
3916 MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
3917 _ => panic!("Wrong message type"),
3918 }
3919 }
3920
3921 #[tokio::test]
3922 async fn test_p2p_message_types() {
3923 assert_eq!(P2PMCPMessageType::Request, P2PMCPMessageType::Request);
3925 assert_eq!(P2PMCPMessageType::Response, P2PMCPMessageType::Response);
3926 assert_eq!(
3927 P2PMCPMessageType::ServiceAdvertisement,
3928 P2PMCPMessageType::ServiceAdvertisement
3929 );
3930 assert_eq!(
3931 P2PMCPMessageType::ServiceDiscovery,
3932 P2PMCPMessageType::ServiceDiscovery
3933 );
3934
3935 for msg_type in [
3937 P2PMCPMessageType::Request,
3938 P2PMCPMessageType::Response,
3939 P2PMCPMessageType::ServiceAdvertisement,
3940 P2PMCPMessageType::ServiceDiscovery,
3941 ] {
3942 let serialized = serde_json::to_string(&msg_type).expect("Test assertion failed");
3943 let deserialized: P2PMCPMessageType =
3944 serde_json::from_str(&serialized).expect("Test assertion failed");
3945 assert_eq!(msg_type, deserialized);
3946 }
3947 }
3948}