ant_core/
mcp.rs

1//! Model Context Protocol (MCP) Server Implementation
2//!
3//! This module provides a fully-featured MCP server that integrates with the P2P network,
4//! enabling AI agents to discover and call tools across the distributed network.
5//! 
6//! The implementation includes:
7//! - MCP message routing over P2P transport
8//! - Tool registration and discovery through DHT
9//! - Security and authentication for remote calls
10//! - Service health monitoring and load balancing
11
12pub mod security;
13
14use crate::dht::{Key, DHT};
15use crate::{PeerId, Result, P2PError};
16use serde::{Deserialize, Serialize};
17use serde_json::{json, Value};
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime, Instant};
21use tokio::sync::{RwLock, mpsc, oneshot};
22use tokio::time::timeout;
23use tracing::{debug, info};
24use rand;
25
26pub use security::*;
27
28/// MCP protocol version
29pub const MCP_VERSION: &str = "2024-11-05";
30
31/// Maximum message size for MCP calls (1MB)
32pub const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
33
34/// Default timeout for MCP calls
35pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30);
36
37/// MCP protocol identifier for P2P messaging
38pub const MCP_PROTOCOL: &str = "/p2p-foundation/mcp/1.0.0";
39
40/// Service discovery refresh interval
41pub const SERVICE_DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
42
43/// MCP message types
44#[derive(Debug, Clone, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum MCPMessage {
47    /// Initialize MCP session
48    Initialize {
49        /// MCP protocol version being used
50        protocol_version: String,
51        /// Client capabilities for this session
52        capabilities: MCPCapabilities,
53        /// Information about the connecting client
54        client_info: MCPClientInfo,
55    },
56    /// Initialize response
57    InitializeResult {
58        /// MCP protocol version the server supports
59        protocol_version: String,
60        /// Server capabilities for this session
61        capabilities: MCPCapabilities,
62        /// Information about the MCP server
63        server_info: MCPServerInfo,
64    },
65    /// List available tools
66    ListTools {
67        /// Pagination cursor for large tool lists
68        cursor: Option<String>,
69    },
70    /// List tools response
71    ListToolsResult {
72        /// Available tools on this server
73        tools: Vec<MCPTool>,
74        /// Next pagination cursor if more tools available
75        next_cursor: Option<String>,
76    },
77    /// Call a tool
78    CallTool {
79        /// Name of the tool to call
80        name: String,
81        /// Arguments to pass to the tool
82        arguments: Value,
83    },
84    /// Tool call response
85    CallToolResult {
86        /// Content returned by the tool
87        content: Vec<MCPContent>,
88        /// Whether the call resulted in an error
89        is_error: bool,
90    },
91    /// List available prompts
92    ListPrompts {
93        /// Pagination cursor for large prompt lists
94        cursor: Option<String>,
95    },
96    /// List prompts response
97    ListPromptsResult {
98        /// Available prompts on this server
99        prompts: Vec<MCPPrompt>,
100        /// Next pagination cursor if more prompts available
101        next_cursor: Option<String>,
102    },
103    /// Get a prompt
104    GetPrompt {
105        /// Name of the prompt to retrieve
106        name: String,
107        /// Arguments to customize the prompt
108        arguments: Option<Value>,
109    },
110    /// Get prompt response
111    GetPromptResult {
112        /// Description of the prompt
113        description: Option<String>,
114        /// Prompt messages/content
115        messages: Vec<MCPPromptMessage>,
116    },
117    /// List available resources
118    ListResources {
119        /// Pagination cursor for large resource lists
120        cursor: Option<String>,
121    },
122    /// List resources response
123    ListResourcesResult {
124        /// Available resources on this server
125        resources: Vec<MCPResource>,
126        /// Next pagination cursor if more resources available
127        next_cursor: Option<String>,
128    },
129    /// Read a resource
130    ReadResource {
131        /// URI of the resource to read
132        uri: String,
133    },
134    /// Read resource response
135    ReadResourceResult {
136        /// Contents of the requested resource
137        contents: Vec<MCPResourceContent>,
138    },
139    /// Subscribe to resource
140    SubscribeResource {
141        /// URI of the resource to subscribe to
142        uri: String,
143    },
144    /// Unsubscribe from resource
145    UnsubscribeResource {
146        /// URI of the resource to unsubscribe from
147        uri: String,
148    },
149    /// Resource updated notification
150    ResourceUpdated {
151        /// URI of the resource that was updated
152        uri: String,
153    },
154    /// List logs
155    ListLogs {
156        /// Pagination cursor for large log lists
157        cursor: Option<String>,
158    },
159    /// List logs response
160    ListLogsResult {
161        /// Log entries available on this server
162        logs: Vec<MCPLogEntry>,
163        /// Next pagination cursor if more logs available
164        next_cursor: Option<String>,
165    },
166    /// Set log level
167    SetLogLevel {
168        /// Log level to set for the server
169        level: MCPLogLevel,
170    },
171    /// Error response
172    Error {
173        /// Error code identifying the type of error
174        code: i32,
175        /// Human-readable error message
176        message: String,
177        /// Optional additional error data
178        data: Option<Value>,
179    },
180}
181
182/// MCP capabilities
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct MCPCapabilities {
185    /// Experimental capabilities
186    pub experimental: Option<Value>,
187    /// Sampling capability
188    pub sampling: Option<Value>,
189    /// Tools capability
190    pub tools: Option<MCPToolsCapability>,
191    /// Prompts capability
192    pub prompts: Option<MCPPromptsCapability>,
193    /// Resources capability
194    pub resources: Option<MCPResourcesCapability>,
195    /// Logging capability
196    pub logging: Option<MCPLoggingCapability>,
197}
198
199/// Tools capability configuration
200#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct MCPToolsCapability {
202    /// Whether tools are supported
203    pub list_changed: Option<bool>,
204}
205
206/// Prompts capability configuration
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct MCPPromptsCapability {
209    /// Whether prompts are supported
210    pub list_changed: Option<bool>,
211}
212
213/// Resources capability configuration
214#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct MCPResourcesCapability {
216    /// Whether resources are supported
217    pub subscribe: Option<bool>,
218    /// Whether resource listing is supported
219    pub list_changed: Option<bool>,
220}
221
222/// Logging capability configuration
223#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct MCPLoggingCapability {
225    /// Available log levels
226    pub levels: Option<Vec<MCPLogLevel>>,
227}
228
229/// MCP client information
230#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct MCPClientInfo {
232    /// Client name
233    pub name: String,
234    /// Client version
235    pub version: String,
236}
237
238/// MCP server information
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct MCPServerInfo {
241    /// Server name
242    pub name: String,
243    /// Server version
244    pub version: String,
245}
246
247/// MCP tool definition
248#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct MCPTool {
250    /// Tool name
251    pub name: String,
252    /// Tool description
253    pub description: String,
254    /// Input schema (JSON Schema)
255    pub input_schema: Value,
256}
257
258/// MCP tool implementation
259pub struct Tool {
260    /// Tool definition
261    pub definition: MCPTool,
262    /// Tool handler function
263    pub handler: Box<dyn ToolHandler + Send + Sync>,
264    /// Tool metadata
265    pub metadata: ToolMetadata,
266}
267
268/// Tool metadata for tracking and optimization
269#[derive(Debug, Clone)]
270pub struct ToolMetadata {
271    /// Tool creation time
272    pub created_at: SystemTime,
273    /// Last call time
274    pub last_called: Option<SystemTime>,
275    /// Total number of calls
276    pub call_count: u64,
277    /// Average execution time
278    pub avg_execution_time: Duration,
279    /// Tool health status
280    pub health_status: ToolHealthStatus,
281    /// Tags for categorization
282    pub tags: Vec<String>,
283}
284
285/// Tool health status
286#[derive(Debug, Clone, Copy, PartialEq)]
287pub enum ToolHealthStatus {
288    /// Tool is healthy and responsive
289    Healthy,
290    /// Tool is experiencing issues
291    Degraded,
292    /// Tool is not responding
293    Unhealthy,
294    /// Tool is disabled
295    Disabled,
296}
297
298/// Tool handler trait
299pub trait ToolHandler {
300    /// Execute the tool with given arguments
301    fn execute(&self, arguments: Value) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>>;
302    
303    /// Validate tool arguments
304    fn validate(&self, arguments: &Value) -> Result<()> {
305        // Default implementation - no validation
306        let _ = arguments;
307        Ok(())
308    }
309    
310    /// Get tool resource requirements
311    fn get_requirements(&self) -> ToolRequirements {
312        ToolRequirements::default()
313    }
314}
315
316/// Tool resource requirements
317#[derive(Debug, Clone)]
318pub struct ToolRequirements {
319    /// Maximum memory usage in bytes
320    pub max_memory: Option<u64>,
321    /// Maximum execution time allowed for tool calls
322    pub max_execution_time: Option<Duration>,
323    /// Required capabilities that must be available
324    pub required_capabilities: Vec<String>,
325    /// Whether this tool requires network access
326    pub requires_network: bool,
327    /// Whether this tool requires file system access
328    pub requires_filesystem: bool,
329}
330
331impl Default for ToolRequirements {
332    fn default() -> Self {
333        Self {
334            max_memory: Some(100 * 1024 * 1024), // 100MB default
335            max_execution_time: Some(Duration::from_secs(30)),
336            required_capabilities: Vec::new(),
337            requires_network: false,
338            requires_filesystem: false,
339        }
340    }
341}
342
343/// MCP content types
344#[derive(Debug, Clone, Serialize, Deserialize)]
345#[serde(tag = "type", rename_all = "snake_case")]
346pub enum MCPContent {
347    /// Text content
348    Text {
349        /// The text content
350        text: String,
351    },
352    /// Image content
353    Image {
354        /// Base64-encoded image data
355        data: String,
356        /// MIME type of the image
357        mime_type: String,
358    },
359    /// Resource content
360    Resource {
361        /// Reference to an MCP resource
362        resource: MCPResourceReference,
363    },
364}
365
366/// MCP resource reference
367#[derive(Debug, Clone, Serialize, Deserialize)]
368pub struct MCPResourceReference {
369    /// Resource URI
370    pub uri: String,
371    /// Resource type
372    pub type_: Option<String>,
373}
374
375/// MCP prompt definition
376#[derive(Debug, Clone, Serialize, Deserialize)]
377pub struct MCPPrompt {
378    /// Prompt name
379    pub name: String,
380    /// Prompt description
381    pub description: Option<String>,
382    /// Prompt arguments schema
383    pub arguments: Option<Value>,
384}
385
386/// MCP prompt message
387#[derive(Debug, Clone, Serialize, Deserialize)]
388pub struct MCPPromptMessage {
389    /// Message role
390    pub role: MCPRole,
391    /// Message content
392    pub content: MCPContent,
393}
394
395/// MCP message roles
396#[derive(Debug, Clone, Serialize, Deserialize)]
397#[serde(rename_all = "snake_case")]
398pub enum MCPRole {
399    /// User message
400    User,
401    /// Assistant message
402    Assistant,
403    /// System message
404    System,
405}
406
407/// MCP resource definition
408#[derive(Debug, Clone, Serialize, Deserialize)]
409pub struct MCPResource {
410    /// Resource URI
411    pub uri: String,
412    /// Resource name
413    pub name: String,
414    /// Resource description
415    pub description: Option<String>,
416    /// Resource MIME type
417    pub mime_type: Option<String>,
418}
419
420/// MCP resource content
421#[derive(Debug, Clone, Serialize, Deserialize)]
422pub struct MCPResourceContent {
423    /// Content URI
424    pub uri: String,
425    /// Content MIME type
426    pub mime_type: String,
427    /// Content data
428    pub text: Option<String>,
429    /// Binary content (base64 encoded)
430    pub blob: Option<String>,
431}
432
433/// MCP log levels
434#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
435#[serde(rename_all = "snake_case")]
436pub enum MCPLogLevel {
437    /// Debug level
438    Debug,
439    /// Info level
440    Info,
441    /// Notice level
442    Notice,
443    /// Warning level
444    Warning,
445    /// Error level
446    Error,
447    /// Critical level
448    Critical,
449    /// Alert level
450    Alert,
451    /// Emergency level
452    Emergency,
453}
454
455/// MCP log entry
456#[derive(Debug, Clone, Serialize, Deserialize)]
457pub struct MCPLogEntry {
458    /// Log level
459    pub level: MCPLogLevel,
460    /// Log message
461    pub data: Value,
462    /// Logger name
463    pub logger: Option<String>,
464}
465
466/// MCP service descriptor for discovery
467#[derive(Debug, Clone, Serialize, Deserialize)]
468pub struct MCPService {
469    /// Service identifier
470    pub service_id: String,
471    /// Node providing the service
472    pub node_id: PeerId,
473    /// Available tools
474    pub tools: Vec<String>,
475    /// Service capabilities
476    pub capabilities: MCPCapabilities,
477    /// Service metadata
478    pub metadata: MCPServiceMetadata,
479    /// Service registration time
480    pub registered_at: SystemTime,
481    /// Service endpoint information
482    pub endpoint: MCPEndpoint,
483}
484
485/// MCP service metadata
486#[derive(Debug, Clone, Serialize, Deserialize)]
487pub struct MCPServiceMetadata {
488    /// Service name
489    pub name: String,
490    /// Service version
491    pub version: String,
492    /// Service description
493    pub description: Option<String>,
494    /// Service tags
495    pub tags: Vec<String>,
496    /// Service health status
497    pub health_status: ServiceHealthStatus,
498    /// Service load metrics
499    pub load_metrics: ServiceLoadMetrics,
500}
501
502/// Service health status
503#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
504pub enum ServiceHealthStatus {
505    /// Service is healthy
506    Healthy,
507    /// Service is degraded
508    Degraded,
509    /// Service is unhealthy
510    Unhealthy,
511    /// Service is maintenance mode
512    Maintenance,
513}
514
515/// Service load metrics
516#[derive(Debug, Clone, Serialize, Deserialize)]
517pub struct ServiceLoadMetrics {
518    /// Current active requests
519    pub active_requests: u32,
520    /// Requests per second
521    pub requests_per_second: f64,
522    /// Average response time in milliseconds
523    pub avg_response_time_ms: f64,
524    /// Error rate (0.0-1.0)
525    pub error_rate: f64,
526    /// CPU usage percentage
527    pub cpu_usage: f64,
528    /// Memory usage in bytes
529    pub memory_usage: u64,
530}
531
532/// MCP endpoint information
533#[derive(Debug, Clone, Serialize, Deserialize)]
534pub struct MCPEndpoint {
535    /// Endpoint protocol (p2p, http, etc.)
536    pub protocol: String,
537    /// Endpoint address
538    pub address: String,
539    /// Endpoint port
540    pub port: Option<u16>,
541    /// TLS enabled
542    pub tls: bool,
543    /// Authentication required
544    pub auth_required: bool,
545}
546
547/// MCP request with routing information
548#[derive(Debug, Clone)]
549pub struct MCPRequest {
550    /// Request ID
551    pub request_id: String,
552    /// Source peer
553    pub source_peer: PeerId,
554    /// Target peer
555    pub target_peer: PeerId,
556    /// MCP message
557    pub message: MCPMessage,
558    /// Request timestamp
559    pub timestamp: SystemTime,
560    /// Request timeout
561    pub timeout: Duration,
562    /// Authentication token
563    pub auth_token: Option<String>,
564}
565
566/// P2P MCP message wrapper for network transmission
567#[derive(Debug, Clone, Serialize, Deserialize)]
568pub struct P2PMCPMessage {
569    /// Message type
570    pub message_type: P2PMCPMessageType,
571    /// Request/Response ID for correlation
572    pub message_id: String,
573    /// Source peer ID
574    pub source_peer: PeerId,
575    /// Target peer ID (optional for broadcasts)
576    pub target_peer: Option<PeerId>,
577    /// Timestamp
578    pub timestamp: u64,
579    /// MCP message payload
580    pub payload: MCPMessage,
581    /// Message TTL for routing
582    pub ttl: u8,
583}
584
585/// P2P MCP message types
586#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
587pub enum P2PMCPMessageType {
588    /// Direct request to a specific peer
589    Request,
590    /// Response to a request
591    Response,
592    /// Service advertisement
593    ServiceAdvertisement,
594    /// Service discovery query
595    ServiceDiscovery,
596}
597
598/// MCP response with metadata
599#[derive(Debug, Clone)]
600pub struct MCPResponse {
601    /// Request ID this response corresponds to
602    pub request_id: String,
603    /// Response message
604    pub message: MCPMessage,
605    /// Response timestamp
606    pub timestamp: SystemTime,
607    /// Processing time
608    pub processing_time: Duration,
609}
610
611/// MCP call context
612#[derive(Debug, Clone)]
613pub struct MCPCallContext {
614    /// Caller peer ID
615    pub caller_id: PeerId,
616    /// Call timestamp
617    pub timestamp: SystemTime,
618    /// Call timeout
619    pub timeout: Duration,
620    /// Authentication information
621    pub auth_info: Option<MCPAuthInfo>,
622    /// Call metadata
623    pub metadata: HashMap<String, String>,
624}
625
626/// MCP authentication information
627#[derive(Debug, Clone)]
628pub struct MCPAuthInfo {
629    /// Authentication token
630    pub token: String,
631    /// Token type
632    pub token_type: String,
633    /// Token expiration
634    pub expires_at: Option<SystemTime>,
635    /// Granted permissions
636    pub permissions: Vec<String>,
637}
638
639/// MCP server configuration
640#[derive(Debug, Clone, Serialize, Deserialize)]
641pub struct MCPServerConfig {
642    /// Server name
643    pub server_name: String,
644    /// Server version
645    pub server_version: String,
646    /// Enable tool discovery via DHT
647    pub enable_dht_discovery: bool,
648    /// Maximum concurrent requests
649    pub max_concurrent_requests: usize,
650    /// Request timeout
651    pub request_timeout: Duration,
652    /// Enable authentication
653    pub enable_auth: bool,
654    /// Enable rate limiting
655    pub enable_rate_limiting: bool,
656    /// Rate limit: requests per minute
657    pub rate_limit_rpm: u32,
658    /// Enable request logging
659    pub enable_logging: bool,
660    /// Maximum tool execution time
661    pub max_tool_execution_time: Duration,
662    /// Tool memory limit
663    pub tool_memory_limit: u64,
664}
665
666impl Default for MCPServerConfig {
667    fn default() -> Self {
668        Self {
669            server_name: "P2P-MCP-Server".to_string(),
670            server_version: crate::VERSION.to_string(),
671            enable_dht_discovery: true,
672            max_concurrent_requests: 100,
673            request_timeout: DEFAULT_CALL_TIMEOUT,
674            enable_auth: true,
675            enable_rate_limiting: true,
676            rate_limit_rpm: 60,
677            enable_logging: true,
678            max_tool_execution_time: Duration::from_secs(30),
679            tool_memory_limit: 100 * 1024 * 1024, // 100MB
680        }
681    }
682}
683
684/// Main MCP server implementation
685pub struct MCPServer {
686    /// Server configuration
687    config: MCPServerConfig,
688    /// Registered tools
689    tools: Arc<RwLock<HashMap<String, Tool>>>,
690    /// Registered prompts (reserved for future implementation)
691    #[allow(dead_code)]
692    prompts: Arc<RwLock<HashMap<String, MCPPrompt>>>,
693    /// Registered resources (reserved for future implementation)
694    #[allow(dead_code)]
695    resources: Arc<RwLock<HashMap<String, MCPResource>>>,
696    /// Active sessions
697    sessions: Arc<RwLock<HashMap<String, MCPSession>>>,
698    /// Request handlers
699    request_handlers: Arc<RwLock<HashMap<String, oneshot::Sender<MCPResponse>>>>,
700    /// DHT reference for service discovery
701    dht: Option<Arc<RwLock<DHT>>>,
702    /// Local service registry
703    local_services: Arc<RwLock<HashMap<String, MCPService>>>,
704    /// Remote service cache
705    remote_services: Arc<RwLock<HashMap<String, MCPService>>>,
706    /// Request statistics
707    stats: Arc<RwLock<MCPServerStats>>,
708    /// Message channel for incoming requests
709    request_tx: mpsc::UnboundedSender<MCPRequest>,
710    /// Message channel for outgoing responses (reserved for future implementation)
711    #[allow(dead_code)]
712    response_rx: Arc<RwLock<mpsc::UnboundedReceiver<MCPResponse>>>,
713    /// Security manager
714    security_manager: Option<Arc<MCPSecurityManager>>,
715    /// Audit logger
716    audit_logger: Arc<SecurityAuditLogger>,
717}
718
719/// MCP session information
720#[derive(Debug, Clone)]
721pub struct MCPSession {
722    /// Session ID
723    pub session_id: String,
724    /// Peer ID
725    pub peer_id: PeerId,
726    /// Client capabilities
727    pub client_capabilities: Option<MCPCapabilities>,
728    /// Session start time
729    pub started_at: SystemTime,
730    /// Last activity time
731    pub last_activity: SystemTime,
732    /// Session state
733    pub state: MCPSessionState,
734    /// Subscribed resources
735    pub subscribed_resources: Vec<String>,
736}
737
738/// MCP session state
739#[derive(Debug, Clone, PartialEq)]
740pub enum MCPSessionState {
741    /// Session is initializing
742    Initializing,
743    /// Session is active
744    Active,
745    /// Session is inactive
746    Inactive,
747    /// Session is terminated
748    Terminated,
749}
750
751/// MCP server statistics
752#[derive(Debug, Clone)]
753pub struct MCPServerStats {
754    /// Total requests processed
755    pub total_requests: u64,
756    /// Total responses sent
757    pub total_responses: u64,
758    /// Total errors
759    pub total_errors: u64,
760    /// Average response time
761    pub avg_response_time: Duration,
762    /// Active sessions
763    pub active_sessions: u32,
764    /// Total tools registered
765    pub total_tools: u32,
766    /// Most called tools
767    pub popular_tools: HashMap<String, u64>,
768    /// Server start time
769    pub server_started_at: SystemTime,
770}
771
772impl Default for MCPServerStats {
773    fn default() -> Self {
774        Self {
775            total_requests: 0,
776            total_responses: 0,
777            total_errors: 0,
778            avg_response_time: Duration::from_millis(0),
779            active_sessions: 0,
780            total_tools: 0,
781            popular_tools: HashMap::new(),
782            server_started_at: SystemTime::now(),
783        }
784    }
785}
786
787impl MCPServer {
788    /// Create a new MCP server
789    pub fn new(config: MCPServerConfig) -> Self {
790        let (request_tx, _request_rx) = mpsc::unbounded_channel();
791        let (_response_tx, response_rx) = mpsc::unbounded_channel();
792        
793        // Initialize security manager if authentication is enabled
794        let security_manager = if config.enable_auth {
795            // Generate a random secret key for token signing
796            let secret_key = (0..32).map(|_| rand::random::<u8>()).collect();
797            Some(Arc::new(MCPSecurityManager::new(secret_key, config.rate_limit_rpm)))
798        } else {
799            None
800        };
801        
802        let server = Self {
803            config,
804            tools: Arc::new(RwLock::new(HashMap::new())),
805            prompts: Arc::new(RwLock::new(HashMap::new())),
806            resources: Arc::new(RwLock::new(HashMap::new())),
807            sessions: Arc::new(RwLock::new(HashMap::new())),
808            request_handlers: Arc::new(RwLock::new(HashMap::new())),
809            dht: None,
810            local_services: Arc::new(RwLock::new(HashMap::new())),
811            remote_services: Arc::new(RwLock::new(HashMap::new())),
812            stats: Arc::new(RwLock::new(MCPServerStats::default())),
813            request_tx,
814            response_rx: Arc::new(RwLock::new(response_rx)),
815            security_manager,
816            audit_logger: Arc::new(SecurityAuditLogger::new(10000)), // Keep 10k audit entries
817        };
818        
819        server
820    }
821    
822    /// Create MCP server with DHT integration
823    pub fn with_dht(mut self, dht: Arc<RwLock<DHT>>) -> Self {
824        self.dht = Some(dht);
825        self
826    }
827    
828    /// Start the MCP server
829    pub async fn start(&self) -> Result<()> {
830        info!("Starting MCP server: {}", self.config.server_name);
831        
832        // Start request processing task
833        self.start_request_processor().await?;
834        
835        // Start service discovery if DHT is available
836        if self.dht.is_some() {
837            self.start_service_discovery().await?;
838        }
839        
840        // Start health monitoring
841        self.start_health_monitor().await?;
842        
843        info!("MCP server started successfully");
844        Ok(())
845    }
846    
847    /// Register a tool
848    pub async fn register_tool(&self, tool: Tool) -> Result<()> {
849        let tool_name = tool.definition.name.clone();
850        
851        // Validate tool
852        self.validate_tool(&tool).await?;
853        
854        // Register locally
855        {
856            let mut tools = self.tools.write().await;
857            tools.insert(tool_name.clone(), tool);
858        }
859        
860        // Update statistics
861        {
862            let mut stats = self.stats.write().await;
863            stats.total_tools += 1;
864        }
865        
866        // Register in DHT if available
867        if let Some(dht) = &self.dht {
868            self.register_tool_in_dht(&tool_name, dht).await?;
869        }
870        
871        info!("Registered tool: {}", tool_name);
872        Ok(())
873    }
874    
875    /// Validate tool before registration
876    async fn validate_tool(&self, tool: &Tool) -> Result<()> {
877        // Check for duplicate names
878        let tools = self.tools.read().await;
879        if tools.contains_key(&tool.definition.name) {
880            return Err(P2PError::MCP(format!("Tool already exists: {}", tool.definition.name)).into());
881        }
882        
883        // Validate tool name
884        if tool.definition.name.is_empty() || tool.definition.name.len() > 100 {
885            return Err(P2PError::MCP("Invalid tool name".to_string()).into());
886        }
887        
888        // Validate schema
889        if !tool.definition.input_schema.is_object() {
890            return Err(P2PError::MCP("Tool input schema must be an object".to_string()).into());
891        }
892        
893        Ok(())
894    }
895    
896    /// Register tool in DHT for discovery
897    async fn register_tool_in_dht(&self, tool_name: &str, dht: &Arc<RwLock<DHT>>) -> Result<()> {
898        let key = Key::new(format!("mcp:tool:{}", tool_name).as_bytes());
899        let service_info = json!({
900            "tool_name": tool_name,
901            "node_id": "local_node", // TODO: Get actual node ID
902            "registered_at": SystemTime::now().duration_since(std::time::UNIX_EPOCH).map_err(|e| P2PError::Network(format!("Time error: {}", e)))?.as_secs(),
903            "capabilities": self.get_server_capabilities().await
904        });
905        
906        let dht_guard = dht.read().await;
907        dht_guard.put(key, serde_json::to_vec(&service_info)?).await?;
908        
909        Ok(())
910    }
911    
912    /// Get server capabilities
913    async fn get_server_capabilities(&self) -> MCPCapabilities {
914        MCPCapabilities {
915            experimental: None,
916            sampling: None,
917            tools: Some(MCPToolsCapability {
918                list_changed: Some(true),
919            }),
920            prompts: Some(MCPPromptsCapability {
921                list_changed: Some(true),
922            }),
923            resources: Some(MCPResourcesCapability {
924                subscribe: Some(true),
925                list_changed: Some(true),
926            }),
927            logging: Some(MCPLoggingCapability {
928                levels: Some(vec![
929                    MCPLogLevel::Debug,
930                    MCPLogLevel::Info,
931                    MCPLogLevel::Warning,
932                    MCPLogLevel::Error,
933                ]),
934            }),
935        }
936    }
937    
938    /// Call a tool by name
939    pub async fn call_tool(&self, tool_name: &str, arguments: Value, context: MCPCallContext) -> Result<Value> {
940        let start_time = Instant::now();
941        
942        // Security checks
943        
944        // 1. Check rate limiting
945        if !self.check_rate_limit(&context.caller_id).await? {
946            return Err(P2PError::MCP("Rate limit exceeded".to_string()));
947        }
948        
949        // 2. Check tool execution permission
950        if !self.check_permission(&context.caller_id, &MCPPermission::ExecuteTools).await? {
951            return Err(P2PError::MCP("Permission denied: execute tools".to_string()));
952        }
953        
954        // 3. Check tool-specific security policy
955        let tool_security_level = self.get_tool_security_policy(tool_name).await;
956        let is_trusted = self.is_trusted_peer(&context.caller_id).await;
957        
958        match tool_security_level {
959            SecurityLevel::Admin => {
960                if !self.check_permission(&context.caller_id, &MCPPermission::Admin).await? {
961                    return Err(P2PError::MCP("Permission denied: admin access required".to_string()));
962                }
963            }
964            SecurityLevel::Strong => {
965                if !is_trusted {
966                    return Err(P2PError::MCP("Permission denied: trusted peer required".to_string()));
967                }
968            }
969            SecurityLevel::Basic => {
970                // Check if authentication is enabled and token is valid
971                if self.config.enable_auth {
972                    if let Some(auth_info) = &context.auth_info {
973                        self.verify_auth_token(&auth_info.token).await?;
974                    } else {
975                        return Err(P2PError::MCP("Authentication required".to_string()));
976                    }
977                }
978            }
979            SecurityLevel::Public => {
980                // No additional checks needed
981            }
982        }
983        
984        // Log the tool call attempt
985        let mut details = HashMap::new();
986        details.insert("action".to_string(), "tool_call".to_string());
987        details.insert("tool_name".to_string(), tool_name.to_string());
988        details.insert("security_level".to_string(), format!("{:?}", tool_security_level));
989        
990        self.audit_logger.log_event(
991            "tool_execution".to_string(),
992            context.caller_id.clone(),
993            details,
994            AuditSeverity::Info,
995        ).await;
996        
997        // Check if tool exists
998        let tool_exists = {
999            let tools = self.tools.read().await;
1000            tools.contains_key(tool_name)
1001        };
1002        
1003        if !tool_exists {
1004            return Err(P2PError::MCP(format!("Tool not found: {}", tool_name)).into());
1005        }
1006        
1007        // Validate arguments and get requirements
1008        let requirements = {
1009            let tools = self.tools.read().await;
1010            let tool = tools.get(tool_name).unwrap(); // Safe because we checked exists above
1011            
1012            // Validate arguments
1013            if let Err(e) = tool.handler.validate(&arguments) {
1014                return Err(P2PError::MCP(format!("Tool validation failed: {}", e)).into());
1015            }
1016            
1017            // Get resource requirements
1018            tool.handler.get_requirements()
1019        };
1020        
1021        // Check resource requirements
1022        self.check_resource_requirements(&requirements).await?;
1023        
1024        // Execute tool in a spawned task to avoid borrow checker issues
1025        let tools_clone = self.tools.clone();
1026        let tool_name_owned = tool_name.to_string();
1027        let execution_timeout = context.timeout.min(requirements.max_execution_time.unwrap_or(context.timeout));
1028        
1029        let result = timeout(execution_timeout, async move {
1030            let tools = tools_clone.read().await;
1031            let tool = tools.get(&tool_name_owned).unwrap(); // Safe because we checked exists above
1032            tool.handler.execute(arguments).await
1033        }).await
1034        .map_err(|_| P2PError::MCP("Tool execution timeout".to_string()))?
1035        .map_err(|e| P2PError::MCP(format!("Tool execution failed: {}", e)))?;
1036        
1037        let execution_time = start_time.elapsed();
1038        
1039        // Update tool statistics
1040        self.update_tool_stats(tool_name, execution_time, true).await;
1041        
1042        // Update server statistics
1043        {
1044            let mut stats = self.stats.write().await;
1045            stats.total_requests += 1;
1046            stats.total_responses += 1;
1047            
1048            // Update average response time
1049            let new_total_time = stats.avg_response_time.mul_f64(stats.total_responses as f64 - 1.0) + execution_time;
1050            stats.avg_response_time = new_total_time.div_f64(stats.total_responses as f64);
1051            
1052            // Update popular tools
1053            *stats.popular_tools.entry(tool_name.to_string()).or_insert(0) += 1;
1054        }
1055        
1056        debug!("Tool '{}' executed in {:?}", tool_name, execution_time);
1057        Ok(result)
1058    }
1059    
1060    /// Check if resource requirements can be met
1061    async fn check_resource_requirements(&self, requirements: &ToolRequirements) -> Result<()> {
1062        // Check memory limit
1063        if let Some(max_memory) = requirements.max_memory {
1064            if max_memory > self.config.tool_memory_limit {
1065                return Err(P2PError::MCP("Tool memory requirement exceeds limit".to_string()).into());
1066            }
1067        }
1068        
1069        // Check execution time limit
1070        if let Some(max_execution_time) = requirements.max_execution_time {
1071            if max_execution_time > self.config.max_tool_execution_time {
1072                return Err(P2PError::MCP("Tool execution time requirement exceeds limit".to_string()).into());
1073            }
1074        }
1075        
1076        // TODO: Check other requirements (capabilities, network, filesystem)
1077        
1078        Ok(())
1079    }
1080    
1081    /// Update tool execution statistics
1082    async fn update_tool_stats(&self, tool_name: &str, execution_time: Duration, success: bool) {
1083        let mut tools = self.tools.write().await;
1084        if let Some(tool) = tools.get_mut(tool_name) {
1085            tool.metadata.call_count += 1;
1086            tool.metadata.last_called = Some(SystemTime::now());
1087            
1088            // Update average execution time
1089            let new_total_time = tool.metadata.avg_execution_time.mul_f64(tool.metadata.call_count as f64 - 1.0) + execution_time;
1090            tool.metadata.avg_execution_time = new_total_time.div_f64(tool.metadata.call_count as f64);
1091            
1092            // Update health status based on success
1093            if !success {
1094                tool.metadata.health_status = match tool.metadata.health_status {
1095                    ToolHealthStatus::Healthy => ToolHealthStatus::Degraded,
1096                    ToolHealthStatus::Degraded => ToolHealthStatus::Unhealthy,
1097                    other => other,
1098                };
1099            } else if tool.metadata.health_status != ToolHealthStatus::Disabled {
1100                tool.metadata.health_status = ToolHealthStatus::Healthy;
1101            }
1102        }
1103    }
1104    
1105    /// List available tools
1106    pub async fn list_tools(&self, _cursor: Option<String>) -> Result<(Vec<MCPTool>, Option<String>)> {
1107        let tools = self.tools.read().await;
1108        let tool_definitions: Vec<MCPTool> = tools.values()
1109            .map(|tool| tool.definition.clone())
1110            .collect();
1111        
1112        // For simplicity, return all tools without pagination
1113        // In a real implementation, you'd implement proper cursor-based pagination
1114        Ok((tool_definitions, None))
1115    }
1116    
1117    /// Start request processing task
1118    async fn start_request_processor(&self) -> Result<()> {
1119        let _request_tx = self.request_tx.clone();
1120        let _server_clone = Arc::new(self);
1121        
1122        tokio::spawn(async move {
1123            info!("MCP request processor started");
1124            
1125            // In a real implementation, this would listen for incoming MCP requests
1126            // and process them through a receiver channel. For now, we'll implement
1127            // the message handling infrastructure without the actual network loop.
1128            
1129            loop {
1130                // Sleep to prevent busy loop - in real implementation,
1131                // this would block on receiving messages
1132                tokio::time::sleep(Duration::from_millis(100)).await;
1133                
1134                // Check if we should shutdown
1135                // This is a placeholder - real implementation would have proper shutdown signaling
1136                break;
1137            }
1138            
1139            info!("MCP request processor stopped");
1140        });
1141        
1142        Ok(())
1143    }
1144    
1145    /// Start service discovery task
1146    async fn start_service_discovery(&self) -> Result<()> {
1147        if let Some(dht) = self.dht.clone() {
1148            let _stats = self.stats.clone();
1149            let remote_services = self.remote_services.clone();
1150            
1151            tokio::spawn(async move {
1152                info!("MCP service discovery started");
1153                
1154                loop {
1155                    // Periodically discover services
1156                    tokio::time::sleep(SERVICE_DISCOVERY_INTERVAL).await;
1157                    
1158                    // Query DHT for MCP services
1159                    let key = Key::new(b"mcp:services");
1160                    let dht_guard = dht.read().await;
1161                    
1162                    match dht_guard.get(&key).await {
1163                        Some(record) => {
1164                            match serde_json::from_slice::<Vec<MCPService>>(&record.value) {
1165                                Ok(services) => {
1166                                    debug!("Discovered {} MCP services", services.len());
1167                                    
1168                                    // Update remote services cache
1169                                    {
1170                                        let mut remote_cache = remote_services.write().await;
1171                                        for service in services {
1172                                            remote_cache.insert(service.service_id.clone(), service);
1173                                        }
1174                                    }
1175                                }
1176                                Err(e) => {
1177                                    debug!("Failed to deserialize services: {}", e);
1178                                }
1179                            }
1180                        }
1181                        None => {
1182                            debug!("No MCP services found in DHT");
1183                        }
1184                    }
1185                }
1186            });
1187        }
1188        
1189        Ok(())
1190    }
1191    
1192    /// Start health monitoring task
1193    async fn start_health_monitor(&self) -> Result<()> {
1194        // TODO: Implement health monitoring
1195        // This would check tool health and update status
1196        Ok(())
1197    }
1198    
1199    /// Get server statistics
1200    pub async fn get_stats(&self) -> MCPServerStats {
1201        self.stats.read().await.clone()
1202    }
1203    
1204    /// Discover remote services in the network
1205    pub async fn discover_remote_services(&self) -> Result<Vec<MCPService>> {
1206        if let Some(dht) = &self.dht {
1207            let key = Key::new(b"mcp:services");
1208            let dht_guard = dht.read().await;
1209            
1210            match dht_guard.get(&key).await {
1211                Some(record) => {
1212                    match serde_json::from_slice::<Vec<MCPService>>(&record.value) {
1213                        Ok(services) => {
1214                            // Update remote services cache
1215                            {
1216                                let mut remote_services = self.remote_services.write().await;
1217                                for service in &services {
1218                                    remote_services.insert(service.service_id.clone(), service.clone());
1219                                }
1220                            }
1221                            Ok(services)
1222                        }
1223                        Err(e) => {
1224                            debug!("Failed to deserialize services: {}", e);
1225                            Ok(Vec::new())
1226                        }
1227                    }
1228                }
1229                None => Ok(Vec::new()),
1230            }
1231        } else {
1232            Ok(Vec::new())
1233        }
1234    }
1235    
1236    /// Call a tool on a remote node
1237    pub async fn call_remote_tool(&self, peer_id: &PeerId, tool_name: &str, arguments: Value, context: MCPCallContext) -> Result<Value> {
1238        let request_id = uuid::Uuid::new_v4().to_string();
1239        
1240        // Create MCP call tool message
1241        let mcp_message = MCPMessage::CallTool {
1242            name: tool_name.to_string(),
1243            arguments,
1244        };
1245        
1246        // Create P2P message wrapper
1247        let p2p_message = P2PMCPMessage {
1248            message_type: P2PMCPMessageType::Request,
1249            message_id: request_id.clone(),
1250            source_peer: context.caller_id.clone(),
1251            target_peer: Some(peer_id.clone()),
1252            timestamp: SystemTime::now()
1253                .duration_since(std::time::UNIX_EPOCH)
1254                .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1255                .as_secs(),
1256            payload: mcp_message,
1257            ttl: 5, // Max 5 hops
1258        };
1259        
1260        // Serialize the message
1261        let message_data = serde_json::to_vec(&p2p_message)
1262            .map_err(|e| P2PError::Serialization(e))?;
1263        
1264        if message_data.len() > MAX_MESSAGE_SIZE {
1265            return Err(P2PError::MCP("Message too large".to_string()));
1266        }
1267        
1268        // Create response channel
1269        let (response_tx, _response_rx) = oneshot::channel::<MCPResponse>();
1270        
1271        // Store response handler
1272        {
1273            let mut handlers = self.request_handlers.write().await;
1274            handlers.insert(request_id.clone(), response_tx);
1275        }
1276        
1277        // Send via P2P network - this will need to be connected to the network layer
1278        // For now, return an error indicating the need for network integration
1279        Err(P2PError::MCP("Remote tool calling requires P2P network integration".to_string()))
1280    }
1281    
1282    /// Handle incoming P2P MCP message
1283    pub async fn handle_p2p_message(&self, message_data: &[u8], source_peer: &PeerId) -> Result<Option<Vec<u8>>> {
1284        // Deserialize the P2P message
1285        let p2p_message: P2PMCPMessage = serde_json::from_slice(message_data)
1286            .map_err(|e| P2PError::Serialization(e))?;
1287        
1288        debug!("Received MCP message from {}: {:?}", source_peer, p2p_message.message_type);
1289        
1290        match p2p_message.message_type {
1291            P2PMCPMessageType::Request => {
1292                self.handle_remote_request(p2p_message).await
1293            }
1294            P2PMCPMessageType::Response => {
1295                self.handle_remote_response(p2p_message).await?;
1296                Ok(None) // Responses don't generate replies
1297            }
1298            P2PMCPMessageType::ServiceAdvertisement => {
1299                self.handle_service_advertisement(p2p_message).await?;
1300                Ok(None)
1301            }
1302            P2PMCPMessageType::ServiceDiscovery => {
1303                self.handle_service_discovery(p2p_message).await
1304            }
1305        }
1306    }
1307    
1308    /// Handle remote tool call request
1309    async fn handle_remote_request(&self, message: P2PMCPMessage) -> Result<Option<Vec<u8>>> {
1310        match message.payload {
1311            MCPMessage::CallTool { name, arguments } => {
1312                let context = MCPCallContext {
1313                    caller_id: message.source_peer.clone(),
1314                    timestamp: SystemTime::now(),
1315                    timeout: DEFAULT_CALL_TIMEOUT,
1316                    auth_info: None,
1317                    metadata: HashMap::new(),
1318                };
1319                
1320                // Call the local tool
1321                let result = self.call_tool(&name, arguments, context).await;
1322                
1323                // Create response message
1324                let response_payload = match result {
1325                    Ok(value) => MCPMessage::CallToolResult {
1326                        content: vec![MCPContent::Text { text: value.to_string() }],
1327                        is_error: false,
1328                    },
1329                    Err(e) => MCPMessage::Error {
1330                        code: -1,
1331                        message: e.to_string(),
1332                        data: None,
1333                    },
1334                };
1335                
1336                let response_message = P2PMCPMessage {
1337                    message_type: P2PMCPMessageType::Response,
1338                    message_id: message.message_id,
1339                    source_peer: "local".to_string(), // TODO: Get actual local peer ID
1340                    target_peer: Some(message.source_peer),
1341                    timestamp: SystemTime::now()
1342                        .duration_since(std::time::UNIX_EPOCH)
1343                        .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1344                        .as_secs(),
1345                    payload: response_payload,
1346                    ttl: message.ttl.saturating_sub(1),
1347                };
1348                
1349                // Serialize response
1350                let response_data = serde_json::to_vec(&response_message)
1351                    .map_err(|e| P2PError::Serialization(e))?;
1352                
1353                Ok(Some(response_data))
1354            }
1355            MCPMessage::ListTools { cursor: _ } => {
1356                let (tools, _) = self.list_tools(None).await?;
1357                
1358                let response_payload = MCPMessage::ListToolsResult {
1359                    tools,
1360                    next_cursor: None,
1361                };
1362                
1363                let response_message = P2PMCPMessage {
1364                    message_type: P2PMCPMessageType::Response,
1365                    message_id: message.message_id,
1366                    source_peer: "local".to_string(), // TODO: Get actual local peer ID
1367                    target_peer: Some(message.source_peer),
1368                    timestamp: SystemTime::now()
1369                        .duration_since(std::time::UNIX_EPOCH)
1370                        .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1371                        .as_secs(),
1372                    payload: response_payload,
1373                    ttl: message.ttl.saturating_sub(1),
1374                };
1375                
1376                let response_data = serde_json::to_vec(&response_message)
1377                    .map_err(|e| P2PError::Serialization(e))?;
1378                
1379                Ok(Some(response_data))
1380            }
1381            _ => {
1382                // Unsupported request type
1383                let error_response = P2PMCPMessage {
1384                    message_type: P2PMCPMessageType::Response,
1385                    message_id: message.message_id,
1386                    source_peer: "local".to_string(), // TODO: Get actual local peer ID
1387                    target_peer: Some(message.source_peer),
1388                    timestamp: SystemTime::now()
1389                        .duration_since(std::time::UNIX_EPOCH)
1390                        .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1391                        .as_secs(),
1392                    payload: MCPMessage::Error {
1393                        code: -2,
1394                        message: "Unsupported request type".to_string(),
1395                        data: None,
1396                    },
1397                    ttl: message.ttl.saturating_sub(1),
1398                };
1399                
1400                let response_data = serde_json::to_vec(&error_response)
1401                    .map_err(|e| P2PError::Serialization(e))?;
1402                
1403                Ok(Some(response_data))
1404            }
1405        }
1406    }
1407    
1408    // Security-related methods
1409    
1410    /// Generate authentication token for peer
1411    pub async fn generate_auth_token(&self, peer_id: &PeerId, permissions: Vec<MCPPermission>, ttl: Duration) -> Result<String> {
1412        if let Some(security_manager) = &self.security_manager {
1413            let token = security_manager.generate_token(peer_id, permissions, ttl).await?;
1414            
1415            // Log authentication event
1416            let mut details = HashMap::new();
1417            details.insert("action".to_string(), "token_generated".to_string());
1418            details.insert("ttl_seconds".to_string(), ttl.as_secs().to_string());
1419            
1420            self.audit_logger.log_event(
1421                "authentication".to_string(),
1422                peer_id.clone(),
1423                details,
1424                AuditSeverity::Info,
1425            ).await;
1426            
1427            Ok(token)
1428        } else {
1429            Err(P2PError::MCP("Authentication not enabled".to_string()))
1430        }
1431    }
1432    
1433    /// Verify authentication token
1434    pub async fn verify_auth_token(&self, token: &str) -> Result<TokenPayload> {
1435        if let Some(security_manager) = &self.security_manager {
1436            match security_manager.verify_token(token).await {
1437                Ok(payload) => {
1438                    // Log successful verification
1439                    let mut details = HashMap::new();
1440                    details.insert("action".to_string(), "token_verified".to_string());
1441                    details.insert("subject".to_string(), payload.sub.clone());
1442                    
1443                    self.audit_logger.log_event(
1444                        "authentication".to_string(),
1445                        payload.iss.clone(),
1446                        details,
1447                        AuditSeverity::Info,
1448                    ).await;
1449                    
1450                    Ok(payload)
1451                }
1452                Err(e) => {
1453                    // Log failed verification
1454                    let mut details = HashMap::new();
1455                    details.insert("action".to_string(), "token_verification_failed".to_string());
1456                    details.insert("error".to_string(), e.to_string());
1457                    
1458                    self.audit_logger.log_event(
1459                        "authentication".to_string(),
1460                        "unknown".to_string(),
1461                        details,
1462                        AuditSeverity::Warning,
1463                    ).await;
1464                    
1465                    Err(e)
1466                }
1467            }
1468        } else {
1469            Err(P2PError::MCP("Authentication not enabled".to_string()))
1470        }
1471    }
1472    
1473    /// Check if peer has permission for operation
1474    pub async fn check_permission(&self, peer_id: &PeerId, permission: &MCPPermission) -> Result<bool> {
1475        if let Some(security_manager) = &self.security_manager {
1476            security_manager.check_permission(peer_id, permission).await
1477        } else {
1478            // If security is disabled, allow all operations
1479            Ok(true)
1480        }
1481    }
1482    
1483    /// Check rate limit for peer
1484    pub async fn check_rate_limit(&self, peer_id: &PeerId) -> Result<bool> {
1485        if let Some(security_manager) = &self.security_manager {
1486            let allowed = security_manager.check_rate_limit(peer_id).await?;
1487            
1488            if !allowed {
1489                // Log rate limit violation
1490                let mut details = HashMap::new();
1491                details.insert("action".to_string(), "rate_limit_exceeded".to_string());
1492                
1493                self.audit_logger.log_event(
1494                    "rate_limiting".to_string(),
1495                    peer_id.clone(),
1496                    details,
1497                    AuditSeverity::Warning,
1498                ).await;
1499            }
1500            
1501            Ok(allowed)
1502        } else {
1503            // If rate limiting is disabled, allow all requests
1504            Ok(true)
1505        }
1506    }
1507    
1508    /// Grant permission to peer
1509    pub async fn grant_permission(&self, peer_id: &PeerId, permission: MCPPermission) -> Result<()> {
1510        if let Some(security_manager) = &self.security_manager {
1511            security_manager.grant_permission(peer_id, permission.clone()).await?;
1512            
1513            // Log permission grant
1514            let mut details = HashMap::new();
1515            details.insert("action".to_string(), "permission_granted".to_string());
1516            details.insert("permission".to_string(), permission.as_str().to_string());
1517            
1518            self.audit_logger.log_event(
1519                "authorization".to_string(),
1520                peer_id.clone(),
1521                details,
1522                AuditSeverity::Info,
1523            ).await;
1524            
1525            Ok(())
1526        } else {
1527            Err(P2PError::MCP("Security not enabled".to_string()))
1528        }
1529    }
1530    
1531    /// Revoke permission from peer
1532    pub async fn revoke_permission(&self, peer_id: &PeerId, permission: &MCPPermission) -> Result<()> {
1533        if let Some(security_manager) = &self.security_manager {
1534            security_manager.revoke_permission(peer_id, permission).await?;
1535            
1536            // Log permission revocation
1537            let mut details = HashMap::new();
1538            details.insert("action".to_string(), "permission_revoked".to_string());
1539            details.insert("permission".to_string(), permission.as_str().to_string());
1540            
1541            self.audit_logger.log_event(
1542                "authorization".to_string(),
1543                peer_id.clone(),
1544                details,
1545                AuditSeverity::Info,
1546            ).await;
1547            
1548            Ok(())
1549        } else {
1550            Err(P2PError::MCP("Security not enabled".to_string()))
1551        }
1552    }
1553    
1554    /// Add trusted peer
1555    pub async fn add_trusted_peer(&self, peer_id: PeerId) -> Result<()> {
1556        if let Some(security_manager) = &self.security_manager {
1557            security_manager.add_trusted_peer(peer_id.clone()).await?;
1558            
1559            // Log trusted peer addition
1560            let mut details = HashMap::new();
1561            details.insert("action".to_string(), "trusted_peer_added".to_string());
1562            
1563            self.audit_logger.log_event(
1564                "trust_management".to_string(),
1565                peer_id,
1566                details,
1567                AuditSeverity::Info,
1568            ).await;
1569            
1570            Ok(())
1571        } else {
1572            Err(P2PError::MCP("Security not enabled".to_string()))
1573        }
1574    }
1575    
1576    /// Check if peer is trusted
1577    pub async fn is_trusted_peer(&self, peer_id: &PeerId) -> bool {
1578        if let Some(security_manager) = &self.security_manager {
1579            security_manager.is_trusted_peer(peer_id).await
1580        } else {
1581            false
1582        }
1583    }
1584    
1585    /// Set security policy for tool
1586    pub async fn set_tool_security_policy(&self, tool_name: String, level: SecurityLevel) -> Result<()> {
1587        if let Some(security_manager) = &self.security_manager {
1588            security_manager.set_tool_policy(tool_name.clone(), level.clone()).await?;
1589            
1590            // Log policy change
1591            let mut details = HashMap::new();
1592            details.insert("action".to_string(), "tool_policy_set".to_string());
1593            details.insert("tool_name".to_string(), tool_name);
1594            details.insert("security_level".to_string(), format!("{:?}", level));
1595            
1596            self.audit_logger.log_event(
1597                "security_policy".to_string(),
1598                "system".to_string(),
1599                details,
1600                AuditSeverity::Info,
1601            ).await;
1602            
1603            Ok(())
1604        } else {
1605            Err(P2PError::MCP("Security not enabled".to_string()))
1606        }
1607    }
1608    
1609    /// Get security policy for tool
1610    pub async fn get_tool_security_policy(&self, tool_name: &str) -> SecurityLevel {
1611        if let Some(security_manager) = &self.security_manager {
1612            security_manager.get_tool_policy(tool_name).await
1613        } else {
1614            SecurityLevel::Public
1615        }
1616    }
1617    
1618    /// Get peer security statistics
1619    pub async fn get_peer_security_stats(&self, peer_id: &PeerId) -> Option<PeerACL> {
1620        if let Some(security_manager) = &self.security_manager {
1621            security_manager.get_peer_stats(peer_id).await
1622        } else {
1623            None
1624        }
1625    }
1626    
1627    /// Get recent security audit entries
1628    pub async fn get_security_audit(&self, limit: Option<usize>) -> Vec<SecurityAuditEntry> {
1629        self.audit_logger.get_recent_entries(limit).await
1630    }
1631    
1632    /// Perform security housekeeping
1633    pub async fn security_cleanup(&self) -> Result<()> {
1634        if let Some(security_manager) = &self.security_manager {
1635            security_manager.cleanup().await?;
1636        }
1637        Ok(())
1638    }
1639    
1640    /// Handle remote response
1641    async fn handle_remote_response(&self, message: P2PMCPMessage) -> Result<()> {
1642        // Find the waiting request handler
1643        let response_tx = {
1644            let mut handlers = self.request_handlers.write().await;
1645            handlers.remove(&message.message_id)
1646        };
1647        
1648        if let Some(tx) = response_tx {
1649            let response = MCPResponse {
1650                request_id: message.message_id,
1651                message: message.payload,
1652                timestamp: SystemTime::now(),
1653                processing_time: Duration::from_millis(0), // TODO: Calculate actual processing time
1654            };
1655            
1656            // Send response to waiting caller
1657            let _ = tx.send(response);
1658        } else {
1659            debug!("Received response for unknown request: {}", message.message_id);
1660        }
1661        
1662        Ok(())
1663    }
1664    
1665    /// Handle service advertisement
1666    async fn handle_service_advertisement(&self, _message: P2PMCPMessage) -> Result<()> {
1667        // TODO: Parse service advertisement and update remote services cache
1668        Ok(())
1669    }
1670    
1671    /// Handle service discovery request
1672    async fn handle_service_discovery(&self, message: P2PMCPMessage) -> Result<Option<Vec<u8>>> {
1673        // Create service advertisement with our local services
1674        let local_services: Vec<MCPService> = {
1675            let services = self.local_services.read().await;
1676            services.values().cloned().collect()
1677        };
1678        
1679        if !local_services.is_empty() {
1680            let advertisement = P2PMCPMessage {
1681                message_type: P2PMCPMessageType::ServiceAdvertisement,
1682                message_id: uuid::Uuid::new_v4().to_string(),
1683                source_peer: "local".to_string(), // TODO: Get actual local peer ID
1684                target_peer: Some(message.source_peer),
1685                timestamp: SystemTime::now()
1686                    .duration_since(std::time::UNIX_EPOCH)
1687                    .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1688                    .as_secs(),
1689                payload: MCPMessage::ListToolsResult {
1690                    tools: local_services.into_iter()
1691                        .flat_map(|s| s.tools.into_iter().map(|t| MCPTool {
1692                            name: t,
1693                            description: "Remote tool".to_string(),
1694                            input_schema: json!({"type": "object"}),
1695                        }))
1696                        .collect(),
1697                    next_cursor: None,
1698                },
1699                ttl: message.ttl.saturating_sub(1),
1700            };
1701            
1702            let response_data = serde_json::to_vec(&advertisement)
1703                .map_err(|e| P2PError::Serialization(e))?;
1704            
1705            Ok(Some(response_data))
1706        } else {
1707            Ok(None)
1708        }
1709    }
1710    
1711    /// Shutdown the server
1712    pub async fn shutdown(&self) -> Result<()> {
1713        info!("Shutting down MCP server");
1714        
1715        // Close all sessions
1716        {
1717            let mut sessions = self.sessions.write().await;
1718            for session in sessions.values_mut() {
1719                session.state = MCPSessionState::Terminated;
1720            }
1721            sessions.clear();
1722        }
1723        
1724        // TODO: Cleanup tasks and channels
1725        
1726        info!("MCP server shutdown complete");
1727        Ok(())
1728    }
1729}
1730
1731impl Tool {
1732    /// Create a new tool
1733    pub fn new(name: &str, description: &str, input_schema: Value) -> ToolBuilder {
1734        ToolBuilder {
1735            name: name.to_string(),
1736            description: description.to_string(),
1737            input_schema,
1738            handler: None,
1739            tags: Vec::new(),
1740        }
1741    }
1742}
1743
1744/// Builder for creating tools
1745pub struct ToolBuilder {
1746    name: String,
1747    description: String,
1748    input_schema: Value,
1749    handler: Option<Box<dyn ToolHandler + Send + Sync>>,
1750    tags: Vec<String>,
1751}
1752
1753impl ToolBuilder {
1754    /// Set tool handler
1755    pub fn handler<H: ToolHandler + Send + Sync + 'static>(mut self, handler: H) -> Self {
1756        self.handler = Some(Box::new(handler));
1757        self
1758    }
1759    
1760    /// Add tags
1761    pub fn tags(mut self, tags: Vec<String>) -> Self {
1762        self.tags = tags;
1763        self
1764    }
1765    
1766    /// Build the tool
1767    pub fn build(self) -> Result<Tool> {
1768        let handler = self.handler
1769            .ok_or_else(|| P2PError::MCP("Tool handler is required".to_string()))?;
1770        
1771        let definition = MCPTool {
1772            name: self.name,
1773            description: self.description,
1774            input_schema: self.input_schema,
1775        };
1776        
1777        let metadata = ToolMetadata {
1778            created_at: SystemTime::now(),
1779            last_called: None,
1780            call_count: 0,
1781            avg_execution_time: Duration::from_millis(0),
1782            health_status: ToolHealthStatus::Healthy,
1783            tags: self.tags,
1784        };
1785        
1786        Ok(Tool {
1787            definition,
1788            handler,
1789            metadata,
1790        })
1791    }
1792}
1793
1794/// Simple function-based tool handler
1795pub struct FunctionToolHandler<F> {
1796    function: F,
1797}
1798
1799impl<F, Fut> ToolHandler for FunctionToolHandler<F>
1800where
1801    F: Fn(Value) -> Fut + Send + Sync,
1802    Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
1803{
1804    fn execute(&self, arguments: Value) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>> {
1805        Box::pin((self.function)(arguments))
1806    }
1807}
1808
1809impl<F> FunctionToolHandler<F> {
1810    /// Create a new function-based tool handler
1811    pub fn new(function: F) -> Self {
1812        Self { function }
1813    }
1814}
1815
1816/// MCP service descriptor for discovery and routing
1817impl MCPService {
1818    /// Create a new MCP service descriptor
1819    pub fn new(service_id: String, node_id: PeerId) -> Self {
1820        Self {
1821            service_id,
1822            node_id,
1823            tools: Vec::new(),
1824            capabilities: MCPCapabilities {
1825                experimental: None,
1826                sampling: None,
1827                tools: Some(MCPToolsCapability {
1828                    list_changed: Some(true),
1829                }),
1830                prompts: None,
1831                resources: None,
1832                logging: None,
1833            },
1834            metadata: MCPServiceMetadata {
1835                name: "MCP Service".to_string(),
1836                version: "1.0.0".to_string(),
1837                description: None,
1838                tags: Vec::new(),
1839                health_status: ServiceHealthStatus::Healthy,
1840                load_metrics: ServiceLoadMetrics {
1841                    active_requests: 0,
1842                    requests_per_second: 0.0,
1843                    avg_response_time_ms: 0.0,
1844                    error_rate: 0.0,
1845                    cpu_usage: 0.0,
1846                    memory_usage: 0,
1847                },
1848            },
1849            registered_at: SystemTime::now(),
1850            endpoint: MCPEndpoint {
1851                protocol: "p2p".to_string(),
1852                address: "".to_string(),
1853                port: None,
1854                tls: false,
1855                auth_required: false,
1856            },
1857        }
1858    }
1859}
1860
1861impl Default for MCPCapabilities {
1862    fn default() -> Self {
1863        Self {
1864            experimental: None,
1865            sampling: None,
1866            tools: Some(MCPToolsCapability {
1867                list_changed: Some(true),
1868            }),
1869            prompts: Some(MCPPromptsCapability {
1870                list_changed: Some(true),
1871            }),
1872            resources: Some(MCPResourcesCapability {
1873                subscribe: Some(true),
1874                list_changed: Some(true),
1875            }),
1876            logging: Some(MCPLoggingCapability {
1877                levels: Some(vec![
1878                    MCPLogLevel::Debug,
1879                    MCPLogLevel::Info,
1880                    MCPLogLevel::Warning,
1881                    MCPLogLevel::Error,
1882                ]),
1883            }),
1884        }
1885    }
1886}
1887
1888#[cfg(test)]
1889mod tests {
1890    use super::*;
1891    use crate::dht::{DHT, DHTConfig, Key};
1892    use std::pin::Pin;
1893    use std::future::Future;
1894    use tokio::time::timeout;
1895
1896    /// Test implementation of ToolHandler for unit tests
1897    struct TestTool {
1898        name: String,
1899        should_error: bool,
1900        execution_time: Duration,
1901    }
1902
1903    impl TestTool {
1904        fn new(name: &str) -> Self {
1905            Self {
1906                name: name.to_string(),
1907                should_error: false,
1908                execution_time: Duration::from_millis(10),
1909            }
1910        }
1911
1912        fn with_error(mut self) -> Self {
1913            self.should_error = true;
1914            self
1915        }
1916
1917        fn with_execution_time(mut self, duration: Duration) -> Self {
1918            self.execution_time = duration;
1919            self
1920        }
1921    }
1922
1923    impl ToolHandler for TestTool {
1924        fn execute(&self, arguments: Value) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>> {
1925            let should_error = self.should_error;
1926            let execution_time = self.execution_time;
1927            let name = self.name.clone();
1928
1929            Box::pin(async move {
1930                tokio::time::sleep(execution_time).await;
1931
1932                if should_error {
1933                    return Err(P2PError::MCP(format!("Test error from tool {}", name)).into());
1934                }
1935
1936                // Echo back the arguments with a response marker
1937                Ok(json!({
1938                    "tool": name,
1939                    "arguments": arguments,
1940                    "result": "success"
1941                }))
1942            })
1943        }
1944
1945        fn validate(&self, arguments: &Value) -> Result<()> {
1946            if !arguments.is_object() {
1947                return Err(P2PError::MCP("Arguments must be an object".to_string()).into());
1948            }
1949            Ok(())
1950        }
1951
1952        fn get_requirements(&self) -> ToolRequirements {
1953            ToolRequirements {
1954                max_memory: Some(1024 * 1024), // 1MB
1955                max_execution_time: Some(Duration::from_secs(5)),
1956                required_capabilities: vec!["test".to_string()],
1957                requires_network: false,
1958                requires_filesystem: false,
1959            }
1960        }
1961    }
1962
1963    /// Helper function to create a test MCP server
1964    async fn create_test_mcp_server() -> MCPServer {
1965        let config = MCPServerConfig {
1966            server_name: "test_server".to_string(),
1967            server_version: "1.0.0".to_string(),
1968            enable_auth: false,
1969            enable_rate_limiting: false,
1970            max_concurrent_requests: 10,
1971            request_timeout: Duration::from_secs(30),
1972            enable_dht_discovery: true,
1973            rate_limit_rpm: 60,
1974            enable_logging: true,
1975            max_tool_execution_time: Duration::from_secs(30),
1976            tool_memory_limit: 100 * 1024 * 1024,
1977        };
1978
1979        MCPServer::new(config)
1980    }
1981
1982    /// Helper function to create a test tool
1983    fn create_test_tool(name: &str) -> Tool {
1984        Tool {
1985            definition: MCPTool {
1986                name: name.to_string(),
1987                description: format!("Test tool: {}", name),
1988                input_schema: json!({
1989                    "type": "object",
1990                    "properties": {
1991                        "input": { "type": "string" }
1992                    }
1993                }),
1994            },
1995            handler: Box::new(TestTool::new(name)),
1996            metadata: ToolMetadata {
1997                created_at: SystemTime::now(),
1998                last_called: None,
1999                call_count: 0,
2000                avg_execution_time: Duration::from_millis(0),
2001                health_status: ToolHealthStatus::Healthy,
2002                tags: vec!["test".to_string()],
2003            },
2004        }
2005    }
2006
2007    /// Helper function to create a test DHT
2008    async fn create_test_dht() -> DHT {
2009        let local_id = Key::new(b"test_node_id");
2010        let config = DHTConfig::default();
2011        DHT::new(local_id, config)
2012    }
2013
2014    /// Helper function to create an MCP call context
2015    fn create_test_context(caller_id: PeerId) -> MCPCallContext {
2016        MCPCallContext {
2017            caller_id,
2018            timestamp: SystemTime::now(),
2019            timeout: Duration::from_secs(30),
2020            auth_info: None,
2021            metadata: HashMap::new(),
2022        }
2023    }
2024
2025    #[tokio::test]
2026    async fn test_mcp_server_creation() {
2027        let server = create_test_mcp_server().await;
2028        assert_eq!(server.config.server_name, "test_server");
2029        assert_eq!(server.config.server_version, "1.0.0");
2030        assert!(!server.config.enable_auth);
2031        assert!(!server.config.enable_rate_limiting);
2032    }
2033
2034    #[tokio::test]
2035    async fn test_tool_registration() -> Result<()> {
2036        let server = create_test_mcp_server().await;
2037        let tool = create_test_tool("test_calculator");
2038
2039        // Register the tool
2040        server.register_tool(tool).await?;
2041
2042        // Verify tool is registered
2043        let tools = server.tools.read().await;
2044        assert!(tools.contains_key("test_calculator"));
2045        assert_eq!(tools.get("test_calculator").unwrap().definition.name, "test_calculator");
2046
2047        // Verify stats updated
2048        let stats = server.stats.read().await;
2049        assert_eq!(stats.total_tools, 1);
2050
2051        Ok(())
2052    }
2053
2054    #[tokio::test]
2055    async fn test_tool_registration_duplicate() -> Result<()> {
2056        let server = create_test_mcp_server().await;
2057        let tool1 = create_test_tool("duplicate_tool");
2058        let tool2 = create_test_tool("duplicate_tool");
2059
2060        // Register first tool
2061        server.register_tool(tool1).await?;
2062
2063        // Try to register duplicate - should fail
2064        let result = server.register_tool(tool2).await;
2065        assert!(result.is_err());
2066        assert!(result.unwrap_err().to_string().contains("Tool already exists"));
2067
2068        Ok(())
2069    }
2070
2071    #[tokio::test]
2072    async fn test_tool_validation() {
2073        let server = create_test_mcp_server().await;
2074
2075        // Test invalid tool name (empty)
2076        let mut invalid_tool = create_test_tool("");
2077        let result = server.validate_tool(&invalid_tool).await;
2078        assert!(result.is_err());
2079
2080        // Test invalid tool name (too long)
2081        invalid_tool.definition.name = "a".repeat(200);
2082        let result = server.validate_tool(&invalid_tool).await;
2083        assert!(result.is_err());
2084
2085        // Test invalid schema (not an object)
2086        let mut invalid_schema_tool = create_test_tool("valid_name");
2087        invalid_schema_tool.definition.input_schema = json!("not an object");
2088        let result = server.validate_tool(&invalid_schema_tool).await;
2089        assert!(result.is_err());
2090
2091        // Test valid tool
2092        let valid_tool = create_test_tool("valid_tool");
2093        let result = server.validate_tool(&valid_tool).await;
2094        assert!(result.is_ok());
2095    }
2096
2097    #[tokio::test]
2098    async fn test_tool_call_success() -> Result<()> {
2099        let server = create_test_mcp_server().await;
2100        let tool = create_test_tool("success_tool");
2101        server.register_tool(tool).await?;
2102
2103        let caller_id = "test_peer_123".to_string();
2104        let context = create_test_context(caller_id);
2105        let arguments = json!({"input": "test data"});
2106
2107        let result = server.call_tool("success_tool", arguments.clone(), context).await?;
2108
2109        // Verify response structure
2110        assert_eq!(result["tool"], "success_tool");
2111        assert_eq!(result["arguments"], arguments);
2112        assert_eq!(result["result"], "success");
2113
2114        // Verify tool metadata updated
2115        let tools = server.tools.read().await;
2116        let tool_metadata = &tools.get("success_tool").unwrap().metadata;
2117        assert_eq!(tool_metadata.call_count, 1);
2118        assert!(tool_metadata.last_called.is_some());
2119
2120        Ok(())
2121    }
2122
2123    #[tokio::test]
2124    async fn test_tool_call_nonexistent() -> Result<()> {
2125        let server = create_test_mcp_server().await;
2126        let caller_id = "test_peer_456".to_string();
2127        let context = create_test_context(caller_id);
2128        let arguments = json!({"input": "test"});
2129
2130        let result = server.call_tool("nonexistent_tool", arguments, context).await;
2131        assert!(result.is_err());
2132        assert!(result.unwrap_err().to_string().contains("Tool not found"));
2133
2134        Ok(())
2135    }
2136
2137    #[tokio::test]
2138    async fn test_tool_call_handler_error() -> Result<()> {
2139        let server = create_test_mcp_server().await;
2140        let tool = Tool {
2141            definition: MCPTool {
2142                name: "error_tool".to_string(),
2143                description: "Tool that always errors".to_string(),
2144                input_schema: json!({"type": "object"}),
2145            },
2146            handler: Box::new(TestTool::new("error_tool").with_error()),
2147            metadata: ToolMetadata {
2148                created_at: SystemTime::now(),
2149                last_called: None,
2150                call_count: 0,
2151                avg_execution_time: Duration::from_millis(0),
2152                health_status: ToolHealthStatus::Healthy,
2153                tags: vec![],
2154            },
2155        };
2156
2157        server.register_tool(tool).await?;
2158
2159        let caller_id = "test_peer_error".to_string();
2160        let context = create_test_context(caller_id);
2161        let arguments = json!({"input": "test"});
2162
2163        let result = server.call_tool("error_tool", arguments, context).await;
2164        assert!(result.is_err());
2165        assert!(result.unwrap_err().to_string().contains("Test error from tool error_tool"));
2166
2167        Ok(())
2168    }
2169
2170    #[tokio::test]
2171    async fn test_tool_call_timeout() -> Result<()> {
2172        let server = create_test_mcp_server().await;
2173        let slow_tool = Tool {
2174            definition: MCPTool {
2175                name: "slow_tool".to_string(),
2176                description: "Tool that takes too long".to_string(),
2177                input_schema: json!({"type": "object"}),
2178            },
2179            handler: Box::new(TestTool::new("slow_tool").with_execution_time(Duration::from_secs(2))),
2180            metadata: ToolMetadata {
2181                created_at: SystemTime::now(),
2182                last_called: None,
2183                call_count: 0,
2184                avg_execution_time: Duration::from_millis(0),
2185                health_status: ToolHealthStatus::Healthy,
2186                tags: vec![],
2187            },
2188        };
2189
2190        server.register_tool(slow_tool).await?;
2191
2192        let caller_id = "test_peer_error".to_string();
2193        let context = create_test_context(caller_id);
2194        let arguments = json!({"input": "test"});
2195
2196        // Test with very short timeout
2197        let result = timeout(
2198            Duration::from_millis(100),
2199            server.call_tool("slow_tool", arguments, context)
2200        ).await;
2201
2202        assert!(result.is_err()); // Should timeout
2203
2204        Ok(())
2205    }
2206
2207    #[tokio::test]
2208    async fn test_tool_requirements() {
2209        let tool = TestTool::new("req_tool");
2210        let requirements = tool.get_requirements();
2211
2212        assert_eq!(requirements.max_memory, Some(1024 * 1024));
2213        assert_eq!(requirements.max_execution_time, Some(Duration::from_secs(5)));
2214        assert_eq!(requirements.required_capabilities, vec!["test"]);
2215        assert!(!requirements.requires_network);
2216        assert!(!requirements.requires_filesystem);
2217    }
2218
2219    #[tokio::test]
2220    async fn test_tool_validation_handler() {
2221        let tool = TestTool::new("validation_tool");
2222
2223        // Valid arguments (object)
2224        let valid_args = json!({"key": "value"});
2225        assert!(tool.validate(&valid_args).is_ok());
2226
2227        // Invalid arguments (not an object)
2228        let invalid_args = json!("not an object");
2229        assert!(tool.validate(&invalid_args).is_err());
2230
2231        let invalid_args = json!(123);
2232        assert!(tool.validate(&invalid_args).is_err());
2233    }
2234
2235    #[tokio::test]
2236    async fn test_tool_health_status() {
2237        let mut metadata = ToolMetadata {
2238            created_at: SystemTime::now(),
2239            last_called: None,
2240            call_count: 0,
2241            avg_execution_time: Duration::from_millis(0),
2242            health_status: ToolHealthStatus::Healthy,
2243            tags: vec![],
2244        };
2245
2246        // Test different health statuses
2247        assert_eq!(metadata.health_status, ToolHealthStatus::Healthy);
2248
2249        metadata.health_status = ToolHealthStatus::Degraded;
2250        assert_eq!(metadata.health_status, ToolHealthStatus::Degraded);
2251
2252        metadata.health_status = ToolHealthStatus::Unhealthy;
2253        assert_eq!(metadata.health_status, ToolHealthStatus::Unhealthy);
2254
2255        metadata.health_status = ToolHealthStatus::Disabled;
2256        assert_eq!(metadata.health_status, ToolHealthStatus::Disabled);
2257    }
2258
2259    #[tokio::test]
2260    async fn test_mcp_capabilities() {
2261        let server = create_test_mcp_server().await;
2262        let capabilities = server.get_server_capabilities().await;
2263
2264        assert!(capabilities.tools.is_some());
2265        assert!(capabilities.prompts.is_some());
2266        assert!(capabilities.resources.is_some());
2267        assert!(capabilities.logging.is_some());
2268
2269        let tools_cap = capabilities.tools.unwrap();
2270        assert_eq!(tools_cap.list_changed, Some(true));
2271
2272        let logging_cap = capabilities.logging.unwrap();
2273        let levels = logging_cap.levels.unwrap();
2274        assert!(levels.contains(&MCPLogLevel::Debug));
2275        assert!(levels.contains(&MCPLogLevel::Info));
2276        assert!(levels.contains(&MCPLogLevel::Warning));
2277        assert!(levels.contains(&MCPLogLevel::Error));
2278    }
2279
2280    #[tokio::test]
2281    async fn test_mcp_message_serialization() {
2282        // Test Initialize message
2283        let init_msg = MCPMessage::Initialize {
2284            protocol_version: MCP_VERSION.to_string(),
2285            capabilities: MCPCapabilities {
2286                experimental: None,
2287                sampling: None,
2288                tools: Some(MCPToolsCapability { list_changed: Some(true) }),
2289                prompts: None,
2290                resources: None,
2291                logging: None,
2292            },
2293            client_info: MCPClientInfo {
2294                name: "test_client".to_string(),
2295                version: "1.0.0".to_string(),
2296            },
2297        };
2298
2299        let serialized = serde_json::to_string(&init_msg).unwrap();
2300        let deserialized: MCPMessage = serde_json::from_str(&serialized).unwrap();
2301
2302        match deserialized {
2303            MCPMessage::Initialize { protocol_version, client_info, .. } => {
2304                assert_eq!(protocol_version, MCP_VERSION);
2305                assert_eq!(client_info.name, "test_client");
2306                assert_eq!(client_info.version, "1.0.0");
2307            }
2308            _ => panic!("Wrong message type after deserialization"),
2309        }
2310    }
2311
2312    #[tokio::test]
2313    async fn test_mcp_content_types() {
2314        // Test text content
2315        let text_content = MCPContent::Text {
2316            text: "Hello, world!".to_string(),
2317        };
2318
2319        let serialized = serde_json::to_string(&text_content).unwrap();
2320        let deserialized: MCPContent = serde_json::from_str(&serialized).unwrap();
2321
2322        match deserialized {
2323            MCPContent::Text { text } => assert_eq!(text, "Hello, world!"),
2324            _ => panic!("Wrong content type"),
2325        }
2326
2327        // Test image content
2328        let image_content = MCPContent::Image {
2329            data: "base64data".to_string(),
2330            mime_type: "image/png".to_string(),
2331        };
2332
2333        let serialized = serde_json::to_string(&image_content).unwrap();
2334        let deserialized: MCPContent = serde_json::from_str(&serialized).unwrap();
2335
2336        match deserialized {
2337            MCPContent::Image { data, mime_type } => {
2338                assert_eq!(data, "base64data");
2339                assert_eq!(mime_type, "image/png");
2340            }
2341            _ => panic!("Wrong content type"),
2342        }
2343    }
2344
2345    #[tokio::test]
2346    async fn test_service_health_status() {
2347        let mut metrics = ServiceLoadMetrics {
2348            active_requests: 0,
2349            requests_per_second: 0.0,
2350            avg_response_time_ms: 0.0,
2351            error_rate: 0.0,
2352            cpu_usage: 0.0,
2353            memory_usage: 0,
2354        };
2355
2356        // Test healthy service
2357        let metadata = MCPServiceMetadata {
2358            name: "test_service".to_string(),
2359            version: "1.0.0".to_string(),
2360            description: Some("Test service".to_string()),
2361            tags: vec!["test".to_string()],
2362            health_status: ServiceHealthStatus::Healthy,
2363            load_metrics: metrics.clone(),
2364        };
2365
2366        assert_eq!(metadata.health_status, ServiceHealthStatus::Healthy);
2367
2368        // Test different health statuses
2369        metrics.error_rate = 0.5; // 50% error rate
2370        let degraded_metadata = MCPServiceMetadata {
2371            health_status: ServiceHealthStatus::Degraded,
2372            load_metrics: metrics.clone(),
2373            ..metadata.clone()
2374        };
2375
2376        assert_eq!(degraded_metadata.health_status, ServiceHealthStatus::Degraded);
2377
2378        let unhealthy_metadata = MCPServiceMetadata {
2379            health_status: ServiceHealthStatus::Unhealthy,
2380            ..metadata.clone()
2381        };
2382
2383        assert_eq!(unhealthy_metadata.health_status, ServiceHealthStatus::Unhealthy);
2384    }
2385
2386    #[tokio::test]
2387    async fn test_p2p_mcp_message() {
2388        let source_peer = "source_peer_123".to_string();
2389        let target_peer = "target_peer_456".to_string();
2390
2391        let p2p_message = P2PMCPMessage {
2392            message_type: P2PMCPMessageType::Request,
2393            message_id: uuid::Uuid::new_v4().to_string(),
2394            source_peer: source_peer.clone(),
2395            target_peer: Some(target_peer.clone()),
2396            timestamp: SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(),
2397            payload: MCPMessage::ListTools { cursor: None },
2398            ttl: 10,
2399        };
2400
2401        // Test serialization
2402        let serialized = serde_json::to_string(&p2p_message).unwrap();
2403        let deserialized: P2PMCPMessage = serde_json::from_str(&serialized).unwrap();
2404
2405        assert_eq!(deserialized.message_type, P2PMCPMessageType::Request);
2406        assert_eq!(deserialized.source_peer, source_peer);
2407        assert_eq!(deserialized.target_peer, Some(target_peer));
2408        assert_eq!(deserialized.ttl, 10);
2409
2410        match deserialized.payload {
2411            MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
2412            _ => panic!("Wrong message payload type"),
2413        }
2414    }
2415
2416    #[tokio::test]
2417    async fn test_tool_requirements_default() {
2418        let default_requirements = ToolRequirements::default();
2419
2420        assert_eq!(default_requirements.max_memory, Some(100 * 1024 * 1024));
2421        assert_eq!(default_requirements.max_execution_time, Some(Duration::from_secs(30)));
2422        assert!(default_requirements.required_capabilities.is_empty());
2423        assert!(!default_requirements.requires_network);
2424        assert!(!default_requirements.requires_filesystem);
2425    }
2426
2427    #[tokio::test]
2428    async fn test_mcp_server_stats() {
2429        let server = create_test_mcp_server().await;
2430
2431        // Initial stats should be zero
2432        let stats = server.stats.read().await;
2433        assert_eq!(stats.total_tools, 0);
2434        assert_eq!(stats.total_requests, 0);
2435        assert_eq!(stats.total_responses, 0);
2436        assert_eq!(stats.total_errors, 0);
2437
2438        drop(stats);
2439
2440        // Register a tool and verify stats update
2441        let tool = create_test_tool("stats_test_tool");
2442        server.register_tool(tool).await.unwrap();
2443
2444        let stats = server.stats.read().await;
2445        assert_eq!(stats.total_tools, 1);
2446    }
2447
2448    #[tokio::test]
2449    async fn test_log_levels() {
2450        // Test all log levels serialize/deserialize correctly
2451        let levels = vec![
2452            MCPLogLevel::Debug,
2453            MCPLogLevel::Info,
2454            MCPLogLevel::Notice,
2455            MCPLogLevel::Warning,
2456            MCPLogLevel::Error,
2457            MCPLogLevel::Critical,
2458            MCPLogLevel::Alert,
2459            MCPLogLevel::Emergency,
2460        ];
2461
2462        for level in levels {
2463            let serialized = serde_json::to_string(&level).unwrap();
2464            let deserialized: MCPLogLevel = serde_json::from_str(&serialized).unwrap();
2465            assert_eq!(level as u8, deserialized as u8);
2466        }
2467    }
2468
2469    #[tokio::test]
2470    async fn test_mcp_endpoint() {
2471        let endpoint = MCPEndpoint {
2472            protocol: "p2p".to_string(),
2473            address: "127.0.0.1".to_string(),
2474            port: Some(9000),
2475            tls: true,
2476            auth_required: true,
2477        };
2478
2479        let serialized = serde_json::to_string(&endpoint).unwrap();
2480        let deserialized: MCPEndpoint = serde_json::from_str(&serialized).unwrap();
2481
2482        assert_eq!(deserialized.protocol, "p2p");
2483        assert_eq!(deserialized.address, "127.0.0.1");
2484        assert_eq!(deserialized.port, Some(9000));
2485        assert!(deserialized.tls);
2486        assert!(deserialized.auth_required);
2487    }
2488
2489    #[tokio::test]
2490    async fn test_mcp_service_metadata() {
2491        let load_metrics = ServiceLoadMetrics {
2492            active_requests: 5,
2493            requests_per_second: 10.5,
2494            avg_response_time_ms: 250.0,
2495            error_rate: 0.01,
2496            cpu_usage: 45.5,
2497            memory_usage: 1024 * 1024 * 100, // 100MB
2498        };
2499
2500        let metadata = MCPServiceMetadata {
2501            name: "test_service".to_string(),
2502            version: "2.1.0".to_string(),
2503            description: Some("A test service for unit testing".to_string()),
2504            tags: vec!["test".to_string(), "unit".to_string(), "mcp".to_string()],
2505            health_status: ServiceHealthStatus::Healthy,
2506            load_metrics,
2507        };
2508
2509        // Test serialization
2510        let serialized = serde_json::to_string(&metadata).unwrap();
2511        let deserialized: MCPServiceMetadata = serde_json::from_str(&serialized).unwrap();
2512
2513        assert_eq!(deserialized.name, "test_service");
2514        assert_eq!(deserialized.version, "2.1.0");
2515        assert_eq!(deserialized.description, Some("A test service for unit testing".to_string()));
2516        assert_eq!(deserialized.tags, vec!["test", "unit", "mcp"]);
2517        assert_eq!(deserialized.health_status, ServiceHealthStatus::Healthy);
2518        assert_eq!(deserialized.load_metrics.active_requests, 5);
2519        assert_eq!(deserialized.load_metrics.requests_per_second, 10.5);
2520    }
2521
2522    #[tokio::test]
2523    async fn test_function_tool_handler() {
2524        // Test function tool handler creation and execution
2525        let handler = FunctionToolHandler::new(|args: Value| async move {
2526            let name = args.get("name").and_then(|v| v.as_str()).unwrap_or("world");
2527            Ok(json!({"greeting": format!("Hello, {}!", name)}))
2528        });
2529
2530        let args = json!({"name": "Alice"});
2531        let result = handler.execute(args).await.unwrap();
2532        assert_eq!(result["greeting"], "Hello, Alice!");
2533
2534        // Test with missing argument
2535        let empty_args = json!({});
2536        let result = handler.execute(empty_args).await.unwrap();
2537        assert_eq!(result["greeting"], "Hello, world!");
2538    }
2539
2540    #[tokio::test]
2541    async fn test_mcp_service_creation() {
2542        let service_id = "test_service_123".to_string();
2543        let node_id = "test_node_789".to_string();
2544
2545        let service = MCPService::new(service_id.clone(), node_id.clone());
2546
2547        assert_eq!(service.service_id, service_id);
2548        assert_eq!(service.node_id, node_id);
2549        assert!(service.tools.is_empty());
2550        assert_eq!(service.metadata.name, "MCP Service");
2551        assert_eq!(service.metadata.version, "1.0.0");
2552        assert_eq!(service.metadata.health_status, ServiceHealthStatus::Healthy);
2553        assert_eq!(service.endpoint.protocol, "p2p");
2554        assert!(!service.endpoint.tls);
2555        assert!(!service.endpoint.auth_required);
2556    }
2557
2558    #[tokio::test]
2559    async fn test_mcp_capabilities_default() {
2560        let capabilities = MCPCapabilities::default();
2561
2562        assert!(capabilities.tools.is_some());
2563        assert!(capabilities.prompts.is_some());
2564        assert!(capabilities.resources.is_some());
2565        assert!(capabilities.logging.is_some());
2566
2567        let tools_cap = capabilities.tools.unwrap();
2568        assert_eq!(tools_cap.list_changed, Some(true));
2569
2570        let resources_cap = capabilities.resources.unwrap();
2571        assert_eq!(resources_cap.subscribe, Some(true));
2572        assert_eq!(resources_cap.list_changed, Some(true));
2573
2574        let logging_cap = capabilities.logging.unwrap();
2575        let levels = logging_cap.levels.unwrap();
2576        assert!(levels.contains(&MCPLogLevel::Debug));
2577        assert!(levels.contains(&MCPLogLevel::Info));
2578        assert!(levels.contains(&MCPLogLevel::Warning));
2579        assert!(levels.contains(&MCPLogLevel::Error));
2580    }
2581
2582    #[tokio::test]
2583    async fn test_mcp_request_creation() {
2584        let source_peer = "source_peer_123".to_string();
2585        let target_peer = "target_peer_456".to_string();
2586
2587        let request = MCPRequest {
2588            request_id: uuid::Uuid::new_v4().to_string(),
2589            source_peer: source_peer.clone(),
2590            target_peer: target_peer.clone(),
2591            message: MCPMessage::ListTools { cursor: None },
2592            timestamp: SystemTime::now(),
2593            timeout: Duration::from_secs(30),
2594            auth_token: Some("test_token".to_string()),
2595        };
2596
2597        assert_eq!(request.source_peer, source_peer);
2598        assert_eq!(request.target_peer, target_peer);
2599        assert_eq!(request.timeout, Duration::from_secs(30));
2600        assert_eq!(request.auth_token, Some("test_token".to_string()));
2601
2602        match request.message {
2603            MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
2604            _ => panic!("Wrong message type"),
2605        }
2606    }
2607
2608    #[tokio::test]
2609    async fn test_p2p_message_types() {
2610        // Test all P2P message types
2611        assert_eq!(P2PMCPMessageType::Request, P2PMCPMessageType::Request);
2612        assert_eq!(P2PMCPMessageType::Response, P2PMCPMessageType::Response);
2613        assert_eq!(P2PMCPMessageType::ServiceAdvertisement, P2PMCPMessageType::ServiceAdvertisement);
2614        assert_eq!(P2PMCPMessageType::ServiceDiscovery, P2PMCPMessageType::ServiceDiscovery);
2615
2616        // Test serialization of each type
2617        for msg_type in [
2618            P2PMCPMessageType::Request,
2619            P2PMCPMessageType::Response,
2620            P2PMCPMessageType::ServiceAdvertisement,
2621            P2PMCPMessageType::ServiceDiscovery,
2622        ] {
2623            let serialized = serde_json::to_string(&msg_type).unwrap();
2624            let deserialized: P2PMCPMessageType = serde_json::from_str(&serialized).unwrap();
2625            assert_eq!(msg_type, deserialized);
2626        }
2627    }
2628}