saorsa_core/
mcp.rs

1//! Model Context Protocol (MCP) Server Implementation
2//!
3//! This module provides a fully-featured MCP server that integrates with the P2P network,
4//! enabling AI agents to discover and call tools across the distributed network.
5//! 
6//! The implementation includes:
7//! - MCP message routing over P2P transport
8//! - Tool registration and discovery through DHT
9//! - Security and authentication for remote calls
10//! - Service health monitoring and load balancing
11
12pub mod security;
13
14use crate::dht::{Key, DHT};
15use crate::{PeerId, Result, P2PError};
16use serde::{Deserialize, Serialize};
17use serde_json::{json, Value};
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime, Instant};
21use tokio::sync::{RwLock, mpsc, oneshot};
22use tokio::time::timeout;
23use tracing::{debug, info, warn};
24use rand;
25
26pub use security::*;
27
28/// MCP protocol version
29pub const MCP_VERSION: &str = "2024-11-05";
30
31/// Maximum message size for MCP calls (1MB)
32pub const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
33
34/// Default timeout for MCP calls
35pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30);
36
37/// MCP protocol identifier for P2P messaging
38pub const MCP_PROTOCOL: &str = "/p2p-foundation/mcp/1.0.0";
39
40/// Network message sender trait for MCP server
41#[async_trait::async_trait]
42pub trait NetworkSender: Send + Sync {
43    /// Send a message to a specific peer via the P2P network
44    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
45    
46    /// Get our local peer ID
47    fn local_peer_id(&self) -> &PeerId;
48}
49
50/// Message sender function type for simplified network integration
51pub type MessageSender = Arc<dyn Fn(&PeerId, &str, Vec<u8>) -> Result<()> + Send + Sync>;
52
53/// Service discovery refresh interval
54pub const SERVICE_DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
55
56/// MCP message types
57#[derive(Debug, Clone, Serialize, Deserialize)]
58#[serde(tag = "type", rename_all = "snake_case")]
59pub enum MCPMessage {
60    /// Initialize MCP session
61    Initialize {
62        /// MCP protocol version being used
63        protocol_version: String,
64        /// Client capabilities for this session
65        capabilities: MCPCapabilities,
66        /// Information about the connecting client
67        client_info: MCPClientInfo,
68    },
69    /// Initialize response
70    InitializeResult {
71        /// MCP protocol version the server supports
72        protocol_version: String,
73        /// Server capabilities for this session
74        capabilities: MCPCapabilities,
75        /// Information about the MCP server
76        server_info: MCPServerInfo,
77    },
78    /// List available tools
79    ListTools {
80        /// Pagination cursor for large tool lists
81        cursor: Option<String>,
82    },
83    /// List tools response
84    ListToolsResult {
85        /// Available tools on this server
86        tools: Vec<MCPTool>,
87        /// Next pagination cursor if more tools available
88        next_cursor: Option<String>,
89    },
90    /// Call a tool
91    CallTool {
92        /// Name of the tool to call
93        name: String,
94        /// Arguments to pass to the tool
95        arguments: Value,
96    },
97    /// Tool call response
98    CallToolResult {
99        /// Content returned by the tool
100        content: Vec<MCPContent>,
101        /// Whether the call resulted in an error
102        is_error: bool,
103    },
104    /// List available prompts
105    ListPrompts {
106        /// Pagination cursor for large prompt lists
107        cursor: Option<String>,
108    },
109    /// List prompts response
110    ListPromptsResult {
111        /// Available prompts on this server
112        prompts: Vec<MCPPrompt>,
113        /// Next pagination cursor if more prompts available
114        next_cursor: Option<String>,
115    },
116    /// Get a prompt
117    GetPrompt {
118        /// Name of the prompt to retrieve
119        name: String,
120        /// Arguments to customize the prompt
121        arguments: Option<Value>,
122    },
123    /// Get prompt response
124    GetPromptResult {
125        /// Description of the prompt
126        description: Option<String>,
127        /// Prompt messages/content
128        messages: Vec<MCPPromptMessage>,
129    },
130    /// List available resources
131    ListResources {
132        /// Pagination cursor for large resource lists
133        cursor: Option<String>,
134    },
135    /// List resources response
136    ListResourcesResult {
137        /// Available resources on this server
138        resources: Vec<MCPResource>,
139        /// Next pagination cursor if more resources available
140        next_cursor: Option<String>,
141    },
142    /// Read a resource
143    ReadResource {
144        /// URI of the resource to read
145        uri: String,
146    },
147    /// Read resource response
148    ReadResourceResult {
149        /// Contents of the requested resource
150        contents: Vec<MCPResourceContent>,
151    },
152    /// Subscribe to resource
153    SubscribeResource {
154        /// URI of the resource to subscribe to
155        uri: String,
156    },
157    /// Unsubscribe from resource
158    UnsubscribeResource {
159        /// URI of the resource to unsubscribe from
160        uri: String,
161    },
162    /// Resource updated notification
163    ResourceUpdated {
164        /// URI of the resource that was updated
165        uri: String,
166    },
167    /// List logs
168    ListLogs {
169        /// Pagination cursor for large log lists
170        cursor: Option<String>,
171    },
172    /// List logs response
173    ListLogsResult {
174        /// Log entries available on this server
175        logs: Vec<MCPLogEntry>,
176        /// Next pagination cursor if more logs available
177        next_cursor: Option<String>,
178    },
179    /// Set log level
180    SetLogLevel {
181        /// Log level to set for the server
182        level: MCPLogLevel,
183    },
184    /// Error response
185    Error {
186        /// Error code identifying the type of error
187        code: i32,
188        /// Human-readable error message
189        message: String,
190        /// Optional additional error data
191        data: Option<Value>,
192    },
193}
194
195/// MCP capabilities
196#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct MCPCapabilities {
198    /// Experimental capabilities
199    pub experimental: Option<Value>,
200    /// Sampling capability
201    pub sampling: Option<Value>,
202    /// Tools capability
203    pub tools: Option<MCPToolsCapability>,
204    /// Prompts capability
205    pub prompts: Option<MCPPromptsCapability>,
206    /// Resources capability
207    pub resources: Option<MCPResourcesCapability>,
208    /// Logging capability
209    pub logging: Option<MCPLoggingCapability>,
210}
211
212/// Tools capability configuration
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct MCPToolsCapability {
215    /// Whether tools are supported
216    pub list_changed: Option<bool>,
217}
218
219/// Prompts capability configuration
220#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct MCPPromptsCapability {
222    /// Whether prompts are supported
223    pub list_changed: Option<bool>,
224}
225
226/// Resources capability configuration
227#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct MCPResourcesCapability {
229    /// Whether resources are supported
230    pub subscribe: Option<bool>,
231    /// Whether resource listing is supported
232    pub list_changed: Option<bool>,
233}
234
235/// Logging capability configuration
236#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct MCPLoggingCapability {
238    /// Available log levels
239    pub levels: Option<Vec<MCPLogLevel>>,
240}
241
242/// MCP client information
243#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct MCPClientInfo {
245    /// Client name
246    pub name: String,
247    /// Client version
248    pub version: String,
249}
250
251/// MCP server information
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct MCPServerInfo {
254    /// Server name
255    pub name: String,
256    /// Server version
257    pub version: String,
258}
259
260/// MCP tool definition
261#[derive(Debug, Clone, Serialize, Deserialize)]
262pub struct MCPTool {
263    /// Tool name
264    pub name: String,
265    /// Tool description
266    pub description: String,
267    /// Input schema (JSON Schema)
268    pub input_schema: Value,
269}
270
271/// MCP tool implementation
272pub struct Tool {
273    /// Tool definition
274    pub definition: MCPTool,
275    /// Tool handler function
276    pub handler: Box<dyn ToolHandler + Send + Sync>,
277    /// Tool metadata
278    pub metadata: ToolMetadata,
279}
280
281/// Tool metadata for tracking and optimization
282#[derive(Debug, Clone)]
283pub struct ToolMetadata {
284    /// Tool creation time
285    pub created_at: SystemTime,
286    /// Last call time
287    pub last_called: Option<SystemTime>,
288    /// Total number of calls
289    pub call_count: u64,
290    /// Average execution time
291    pub avg_execution_time: Duration,
292    /// Tool health status
293    pub health_status: ToolHealthStatus,
294    /// Tags for categorization
295    pub tags: Vec<String>,
296}
297
298/// Tool health status
299#[derive(Debug, Clone, Copy, PartialEq)]
300pub enum ToolHealthStatus {
301    /// Tool is healthy and responsive
302    Healthy,
303    /// Tool is experiencing issues
304    Degraded,
305    /// Tool is not responding
306    Unhealthy,
307    /// Tool is disabled
308    Disabled,
309}
310
311
312/// Health monitoring configuration
313#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct HealthMonitorConfig {
315    /// Interval between health checks
316    pub health_check_interval: Duration,
317    /// Timeout for health check requests
318    pub health_check_timeout: Duration,
319    /// Number of consecutive failures before marking unhealthy
320    pub failure_threshold: u32,
321    /// Number of consecutive successes to mark healthy again
322    pub success_threshold: u32,
323    /// Interval for sending heartbeats
324    pub heartbeat_interval: Duration,
325    /// Maximum age of last heartbeat before considering service stale
326    pub heartbeat_timeout: Duration,
327    /// Whether to enable health monitoring
328    pub enabled: bool,
329}
330
331impl Default for HealthMonitorConfig {
332    fn default() -> Self {
333        Self {
334            health_check_interval: Duration::from_secs(30),
335            health_check_timeout: Duration::from_secs(5),
336            failure_threshold: 3,
337            success_threshold: 2,
338            heartbeat_interval: Duration::from_secs(60),
339            heartbeat_timeout: Duration::from_secs(300), // 5 minutes
340            enabled: true,
341        }
342    }
343}
344
345/// Service health information
346#[derive(Debug, Clone, Serialize, Deserialize)]
347pub struct ServiceHealth {
348    /// Service identifier
349    pub service_id: String,
350    /// Current health status
351    pub status: ServiceHealthStatus,
352    /// Last successful health check
353    pub last_health_check: Option<SystemTime>,
354    /// Last heartbeat received
355    pub last_heartbeat: Option<SystemTime>,
356    /// Consecutive failure count
357    pub failure_count: u32,
358    /// Consecutive success count
359    pub success_count: u32,
360    /// Average response time for health checks
361    pub avg_response_time: Duration,
362    /// Error message if unhealthy
363    pub error_message: Option<String>,
364    /// Health check history (last 10 checks)
365    pub health_history: Vec<HealthCheckResult>,
366}
367
368/// Result of a health check
369#[derive(Debug, Clone, Serialize, Deserialize)]
370pub struct HealthCheckResult {
371    /// Timestamp of the check
372    pub timestamp: SystemTime,
373    /// Whether the check was successful
374    pub success: bool,
375    /// Response time
376    pub response_time: Duration,
377    /// Error message if failed
378    pub error_message: Option<String>,
379}
380
381/// Heartbeat message
382#[derive(Debug, Clone, Serialize, Deserialize)]
383pub struct Heartbeat {
384    /// Service ID sending the heartbeat
385    pub service_id: String,
386    /// Peer ID of the service
387    pub peer_id: PeerId,
388    /// Timestamp when heartbeat was sent
389    pub timestamp: SystemTime,
390    /// Current service load (0.0 to 1.0)
391    pub load: f32,
392    /// Available tools
393    pub available_tools: Vec<String>,
394    /// Service capabilities
395    pub capabilities: MCPCapabilities,
396}
397
398/// Health monitoring events
399#[derive(Debug, Clone)]
400pub enum HealthEvent {
401    /// Service became healthy
402    ServiceHealthy {
403        service_id: String,
404        peer_id: PeerId,
405    },
406    /// Service became unhealthy
407    ServiceUnhealthy {
408        service_id: String,
409        peer_id: PeerId,
410        error: String,
411    },
412    /// Service status changed to degraded
413    ServiceDegraded {
414        service_id: String,
415        peer_id: PeerId,
416        reason: String,
417    },
418    /// Heartbeat received
419    HeartbeatReceived {
420        service_id: String,
421        peer_id: PeerId,
422        load: f32,
423    },
424    /// Heartbeat timeout
425    HeartbeatTimeout {
426        service_id: String,
427        peer_id: PeerId,
428    },
429}
430
431/// Tool handler trait
432pub trait ToolHandler {
433    /// Execute the tool with given arguments
434    fn execute(&self, arguments: Value) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>>;
435    
436    /// Validate tool arguments
437    fn validate(&self, arguments: &Value) -> Result<()> {
438        // Default implementation - no validation
439        let _ = arguments;
440        Ok(())
441    }
442    
443    /// Get tool resource requirements
444    fn get_requirements(&self) -> ToolRequirements {
445        ToolRequirements::default()
446    }
447}
448
449/// Tool resource requirements
450#[derive(Debug, Clone)]
451pub struct ToolRequirements {
452    /// Maximum memory usage in bytes
453    pub max_memory: Option<u64>,
454    /// Maximum execution time allowed for tool calls
455    pub max_execution_time: Option<Duration>,
456    /// Required capabilities that must be available
457    pub required_capabilities: Vec<String>,
458    /// Whether this tool requires network access
459    pub requires_network: bool,
460    /// Whether this tool requires file system access
461    pub requires_filesystem: bool,
462}
463
464impl Default for ToolRequirements {
465    fn default() -> Self {
466        Self {
467            max_memory: Some(100 * 1024 * 1024), // 100MB default
468            max_execution_time: Some(Duration::from_secs(30)),
469            required_capabilities: Vec::new(),
470            requires_network: false,
471            requires_filesystem: false,
472        }
473    }
474}
475
476/// MCP content types
477#[derive(Debug, Clone, Serialize, Deserialize)]
478#[serde(tag = "type", rename_all = "snake_case")]
479pub enum MCPContent {
480    /// Text content
481    Text {
482        /// The text content
483        text: String,
484    },
485    /// Image content
486    Image {
487        /// Base64-encoded image data
488        data: String,
489        /// MIME type of the image
490        mime_type: String,
491    },
492    /// Resource content
493    Resource {
494        /// Reference to an MCP resource
495        resource: MCPResourceReference,
496    },
497}
498
499/// MCP resource reference
500#[derive(Debug, Clone, Serialize, Deserialize)]
501pub struct MCPResourceReference {
502    /// Resource URI
503    pub uri: String,
504    /// Resource type
505    pub type_: Option<String>,
506}
507
508/// MCP prompt definition
509#[derive(Debug, Clone, Serialize, Deserialize)]
510pub struct MCPPrompt {
511    /// Prompt name
512    pub name: String,
513    /// Prompt description
514    pub description: Option<String>,
515    /// Prompt arguments schema
516    pub arguments: Option<Value>,
517}
518
519/// MCP prompt message
520#[derive(Debug, Clone, Serialize, Deserialize)]
521pub struct MCPPromptMessage {
522    /// Message role
523    pub role: MCPRole,
524    /// Message content
525    pub content: MCPContent,
526}
527
528/// MCP message roles
529#[derive(Debug, Clone, Serialize, Deserialize)]
530#[serde(rename_all = "snake_case")]
531pub enum MCPRole {
532    /// User message
533    User,
534    /// Assistant message
535    Assistant,
536    /// System message
537    System,
538}
539
540/// MCP resource definition
541#[derive(Debug, Clone, Serialize, Deserialize)]
542pub struct MCPResource {
543    /// Resource URI
544    pub uri: String,
545    /// Resource name
546    pub name: String,
547    /// Resource description
548    pub description: Option<String>,
549    /// Resource MIME type
550    pub mime_type: Option<String>,
551}
552
553/// MCP resource content
554#[derive(Debug, Clone, Serialize, Deserialize)]
555pub struct MCPResourceContent {
556    /// Content URI
557    pub uri: String,
558    /// Content MIME type
559    pub mime_type: String,
560    /// Content data
561    pub text: Option<String>,
562    /// Binary content (base64 encoded)
563    pub blob: Option<String>,
564}
565
566/// MCP log levels
567#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
568#[serde(rename_all = "snake_case")]
569pub enum MCPLogLevel {
570    /// Debug level
571    Debug,
572    /// Info level
573    Info,
574    /// Notice level
575    Notice,
576    /// Warning level
577    Warning,
578    /// Error level
579    Error,
580    /// Critical level
581    Critical,
582    /// Alert level
583    Alert,
584    /// Emergency level
585    Emergency,
586}
587
588/// MCP log entry
589#[derive(Debug, Clone, Serialize, Deserialize)]
590pub struct MCPLogEntry {
591    /// Log level
592    pub level: MCPLogLevel,
593    /// Log message
594    pub data: Value,
595    /// Logger name
596    pub logger: Option<String>,
597}
598
599/// MCP service descriptor for discovery
600#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct MCPService {
602    /// Service identifier
603    pub service_id: String,
604    /// Node providing the service
605    pub node_id: PeerId,
606    /// Available tools
607    pub tools: Vec<String>,
608    /// Service capabilities
609    pub capabilities: MCPCapabilities,
610    /// Service metadata
611    pub metadata: MCPServiceMetadata,
612    /// Service registration time
613    pub registered_at: SystemTime,
614    /// Service endpoint information
615    pub endpoint: MCPEndpoint,
616}
617
618/// MCP service metadata
619#[derive(Debug, Clone, Serialize, Deserialize)]
620pub struct MCPServiceMetadata {
621    /// Service name
622    pub name: String,
623    /// Service version
624    pub version: String,
625    /// Service description
626    pub description: Option<String>,
627    /// Service tags
628    pub tags: Vec<String>,
629    /// Service health status
630    pub health_status: ServiceHealthStatus,
631    /// Service load metrics
632    pub load_metrics: ServiceLoadMetrics,
633}
634
635/// Service health status
636#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
637pub enum ServiceHealthStatus {
638    /// Service is healthy and responsive
639    Healthy,
640    /// Service is experiencing degraded performance
641    Degraded,
642    /// Service is not responding to health checks
643    Unhealthy,
644    /// Service is disabled/maintenance mode
645    Disabled,
646    /// Service status is unknown
647    Unknown,
648}
649
650/// Service load metrics
651#[derive(Debug, Clone, Serialize, Deserialize)]
652pub struct ServiceLoadMetrics {
653    /// Current active requests
654    pub active_requests: u32,
655    /// Requests per second
656    pub requests_per_second: f64,
657    /// Average response time in milliseconds
658    pub avg_response_time_ms: f64,
659    /// Error rate (0.0-1.0)
660    pub error_rate: f64,
661    /// CPU usage percentage
662    pub cpu_usage: f64,
663    /// Memory usage in bytes
664    pub memory_usage: u64,
665}
666
667/// MCP endpoint information
668#[derive(Debug, Clone, Serialize, Deserialize)]
669pub struct MCPEndpoint {
670    /// Endpoint protocol (p2p, http, etc.)
671    pub protocol: String,
672    /// Endpoint address
673    pub address: String,
674    /// Endpoint port
675    pub port: Option<u16>,
676    /// TLS enabled
677    pub tls: bool,
678    /// Authentication required
679    pub auth_required: bool,
680}
681
682/// MCP request with routing information
683#[derive(Debug, Clone)]
684pub struct MCPRequest {
685    /// Request ID
686    pub request_id: String,
687    /// Source peer
688    pub source_peer: PeerId,
689    /// Target peer
690    pub target_peer: PeerId,
691    /// MCP message
692    pub message: MCPMessage,
693    /// Request timestamp
694    pub timestamp: SystemTime,
695    /// Request timeout
696    pub timeout: Duration,
697    /// Authentication token
698    pub auth_token: Option<String>,
699}
700
701/// P2P MCP message wrapper for network transmission
702#[derive(Debug, Clone, Serialize, Deserialize)]
703pub struct P2PMCPMessage {
704    /// Message type
705    pub message_type: P2PMCPMessageType,
706    /// Request/Response ID for correlation
707    pub message_id: String,
708    /// Source peer ID
709    pub source_peer: PeerId,
710    /// Target peer ID (optional for broadcasts)
711    pub target_peer: Option<PeerId>,
712    /// Timestamp
713    pub timestamp: u64,
714    /// MCP message payload
715    pub payload: MCPMessage,
716    /// Message TTL for routing
717    pub ttl: u8,
718}
719
720/// P2P MCP message types
721#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
722pub enum P2PMCPMessageType {
723    /// Direct request to a specific peer
724    Request,
725    /// Response to a request
726    Response,
727    /// Service advertisement
728    ServiceAdvertisement,
729    /// Service discovery query
730    ServiceDiscovery,
731    /// Heartbeat notification
732    Heartbeat,
733    /// Health check request
734    HealthCheck,
735}
736
737/// MCP response with metadata
738#[derive(Debug, Clone)]
739pub struct MCPResponse {
740    /// Request ID this response corresponds to
741    pub request_id: String,
742    /// Response message
743    pub message: MCPMessage,
744    /// Response timestamp
745    pub timestamp: SystemTime,
746    /// Processing time
747    pub processing_time: Duration,
748}
749
750/// MCP call context
751#[derive(Debug, Clone)]
752pub struct MCPCallContext {
753    /// Caller peer ID
754    pub caller_id: PeerId,
755    /// Call timestamp
756    pub timestamp: SystemTime,
757    /// Call timeout
758    pub timeout: Duration,
759    /// Authentication information
760    pub auth_info: Option<MCPAuthInfo>,
761    /// Call metadata
762    pub metadata: HashMap<String, String>,
763}
764
765/// MCP authentication information
766#[derive(Debug, Clone)]
767pub struct MCPAuthInfo {
768    /// Authentication token
769    pub token: String,
770    /// Token type
771    pub token_type: String,
772    /// Token expiration
773    pub expires_at: Option<SystemTime>,
774    /// Granted permissions
775    pub permissions: Vec<String>,
776}
777
778/// MCP server configuration
779#[derive(Debug, Clone, Serialize, Deserialize)]
780pub struct MCPServerConfig {
781    /// Server name
782    pub server_name: String,
783    /// Server version
784    pub server_version: String,
785    /// Enable tool discovery via DHT
786    pub enable_dht_discovery: bool,
787    /// Maximum concurrent requests
788    pub max_concurrent_requests: usize,
789    /// Request timeout
790    pub request_timeout: Duration,
791    /// Enable authentication
792    pub enable_auth: bool,
793    /// Enable rate limiting
794    pub enable_rate_limiting: bool,
795    /// Rate limit: requests per minute
796    pub rate_limit_rpm: u32,
797    /// Enable request logging
798    pub enable_logging: bool,
799    /// Maximum tool execution time
800    pub max_tool_execution_time: Duration,
801    /// Tool memory limit
802    pub tool_memory_limit: u64,
803    /// Health monitoring configuration
804    pub health_monitor: HealthMonitorConfig,
805}
806
807impl Default for MCPServerConfig {
808    fn default() -> Self {
809        Self {
810            server_name: "P2P-MCP-Server".to_string(),
811            server_version: crate::VERSION.to_string(),
812            enable_dht_discovery: true,
813            max_concurrent_requests: 100,
814            request_timeout: DEFAULT_CALL_TIMEOUT,
815            enable_auth: true,
816            enable_rate_limiting: true,
817            rate_limit_rpm: 60,
818            enable_logging: true,
819            max_tool_execution_time: Duration::from_secs(30),
820            tool_memory_limit: 100 * 1024 * 1024, // 100MB
821            health_monitor: HealthMonitorConfig::default(),
822        }
823    }
824}
825
826/// Main MCP server implementation
827pub struct MCPServer {
828    /// Server configuration
829    config: MCPServerConfig,
830    /// Registered tools
831    tools: Arc<RwLock<HashMap<String, Tool>>>,
832    /// Registered prompts (reserved for future implementation)
833    #[allow(dead_code)]
834    prompts: Arc<RwLock<HashMap<String, MCPPrompt>>>,
835    /// Registered resources (reserved for future implementation)
836    #[allow(dead_code)]
837    resources: Arc<RwLock<HashMap<String, MCPResource>>>,
838    /// Active sessions
839    sessions: Arc<RwLock<HashMap<String, MCPSession>>>,
840    /// Request handlers
841    request_handlers: Arc<RwLock<HashMap<String, oneshot::Sender<MCPResponse>>>>,
842    /// DHT reference for service discovery
843    dht: Option<Arc<RwLock<DHT>>>,
844    /// Local service registry
845    local_services: Arc<RwLock<HashMap<String, MCPService>>>,
846    /// Remote service cache
847    remote_services: Arc<RwLock<HashMap<String, MCPService>>>,
848    /// Request statistics
849    stats: Arc<RwLock<MCPServerStats>>,
850    /// Message channel for incoming requests
851    request_tx: mpsc::UnboundedSender<MCPRequest>,
852    /// Message channel for outgoing responses (reserved for future implementation)
853    #[allow(dead_code)]
854    response_rx: Arc<RwLock<mpsc::UnboundedReceiver<MCPResponse>>>,
855    /// Security manager
856    security_manager: Option<Arc<MCPSecurityManager>>,
857    /// Audit logger
858    audit_logger: Arc<SecurityAuditLogger>,
859    /// Network sender for P2P communication
860    network_sender: Arc<RwLock<Option<Arc<dyn NetworkSender>>>>,
861    /// Service health tracking
862    service_health: Arc<RwLock<HashMap<String, ServiceHealth>>>,
863    /// Health event sender
864    health_event_tx: mpsc::UnboundedSender<HealthEvent>,
865    /// Health event receiver (for monitoring)
866    #[allow(dead_code)]
867    health_event_rx: Arc<RwLock<mpsc::UnboundedReceiver<HealthEvent>>>,
868}
869
870/// MCP session information
871#[derive(Debug, Clone)]
872pub struct MCPSession {
873    /// Session ID
874    pub session_id: String,
875    /// Peer ID
876    pub peer_id: PeerId,
877    /// Client capabilities
878    pub client_capabilities: Option<MCPCapabilities>,
879    /// Session start time
880    pub started_at: SystemTime,
881    /// Last activity time
882    pub last_activity: SystemTime,
883    /// Session state
884    pub state: MCPSessionState,
885    /// Subscribed resources
886    pub subscribed_resources: Vec<String>,
887}
888
889/// MCP session state
890#[derive(Debug, Clone, PartialEq)]
891pub enum MCPSessionState {
892    /// Session is initializing
893    Initializing,
894    /// Session is active
895    Active,
896    /// Session is inactive
897    Inactive,
898    /// Session is terminated
899    Terminated,
900}
901
902/// MCP server statistics
903#[derive(Debug, Clone)]
904pub struct MCPServerStats {
905    /// Total requests processed
906    pub total_requests: u64,
907    /// Total responses sent
908    pub total_responses: u64,
909    /// Total errors
910    pub total_errors: u64,
911    /// Average response time
912    pub avg_response_time: Duration,
913    /// Active sessions
914    pub active_sessions: u32,
915    /// Total tools registered
916    pub total_tools: u32,
917    /// Most called tools
918    pub popular_tools: HashMap<String, u64>,
919    /// Server start time
920    pub server_started_at: SystemTime,
921}
922
923impl Default for MCPServerStats {
924    fn default() -> Self {
925        Self {
926            total_requests: 0,
927            total_responses: 0,
928            total_errors: 0,
929            avg_response_time: Duration::from_millis(0),
930            active_sessions: 0,
931            total_tools: 0,
932            popular_tools: HashMap::new(),
933            server_started_at: SystemTime::now(),
934        }
935    }
936}
937
938impl MCPServer {
939    /// Create a new MCP server
940    pub fn new(config: MCPServerConfig) -> Self {
941        let (request_tx, _request_rx) = mpsc::unbounded_channel();
942        let (_response_tx, response_rx) = mpsc::unbounded_channel();
943        let (health_event_tx, health_event_rx) = mpsc::unbounded_channel();
944        
945        // Initialize security manager if authentication is enabled
946        let security_manager = if config.enable_auth {
947            // Generate a random secret key for token signing
948            let secret_key = (0..32).map(|_| rand::random::<u8>()).collect();
949            Some(Arc::new(MCPSecurityManager::new(secret_key, config.rate_limit_rpm)))
950        } else {
951            None
952        };
953        
954        let server = Self {
955            config,
956            tools: Arc::new(RwLock::new(HashMap::new())),
957            prompts: Arc::new(RwLock::new(HashMap::new())),
958            resources: Arc::new(RwLock::new(HashMap::new())),
959            sessions: Arc::new(RwLock::new(HashMap::new())),
960            request_handlers: Arc::new(RwLock::new(HashMap::new())),
961            dht: None,
962            local_services: Arc::new(RwLock::new(HashMap::new())),
963            remote_services: Arc::new(RwLock::new(HashMap::new())),
964            stats: Arc::new(RwLock::new(MCPServerStats::default())),
965            request_tx,
966            response_rx: Arc::new(RwLock::new(response_rx)),
967            security_manager,
968            audit_logger: Arc::new(SecurityAuditLogger::new(10000)), // Keep 10k audit entries
969            network_sender: Arc::new(RwLock::new(None)),
970            service_health: Arc::new(RwLock::new(HashMap::new())),
971            health_event_tx,
972            health_event_rx: Arc::new(RwLock::new(health_event_rx)),
973        };
974        
975        server
976    }
977    
978    /// Create MCP server with DHT integration
979    pub fn with_dht(mut self, dht: Arc<RwLock<DHT>>) -> Self {
980        self.dht = Some(dht);
981        self
982    }
983    
984    /// Set the network sender for P2P communication
985    pub async fn with_network_sender(self, sender: Arc<dyn NetworkSender>) -> Self {
986        *self.network_sender.write().await = Some(sender);
987        self
988    }
989    
990    /// Set the network sender for P2P communication (async method for post-creation setup)
991    pub async fn set_network_sender(&self, sender: Arc<dyn NetworkSender>) {
992        let peer_id = sender.local_peer_id().clone();
993        *self.network_sender.write().await = Some(sender);
994        info!("MCP server network sender configured for peer {}", peer_id);
995    }
996    
997    /// Start the MCP server
998    pub async fn start(&self) -> Result<()> {
999        info!("Starting MCP server: {}", self.config.server_name);
1000        
1001        // Start request processing task
1002        self.start_request_processor().await?;
1003        
1004        // Start service discovery if DHT is available
1005        if self.dht.is_some() {
1006            self.start_service_discovery().await?;
1007        }
1008        
1009        // Start health monitoring
1010        self.start_health_monitor().await?;
1011        
1012        info!("MCP server started successfully");
1013        Ok(())
1014    }
1015    
1016    /// Register a tool
1017    pub async fn register_tool(&self, tool: Tool) -> Result<()> {
1018        let tool_name = tool.definition.name.clone();
1019        
1020        // Validate tool
1021        self.validate_tool(&tool).await?;
1022        
1023        // Register locally
1024        {
1025            let mut tools = self.tools.write().await;
1026            tools.insert(tool_name.clone(), tool);
1027        }
1028        
1029        // Update statistics
1030        {
1031            let mut stats = self.stats.write().await;
1032            stats.total_tools += 1;
1033        }
1034        
1035        // Register in DHT if available
1036        if let Some(dht) = &self.dht {
1037            self.register_tool_in_dht(&tool_name, dht).await?;
1038        }
1039        
1040        // Announce updated service with new tool
1041        if let Err(e) = self.announce_local_services().await {
1042            warn!("Failed to announce service after tool registration: {}", e);
1043        }
1044        
1045        info!("Registered tool: {}", tool_name);
1046        Ok(())
1047    }
1048    
1049    /// Validate tool before registration
1050    async fn validate_tool(&self, tool: &Tool) -> Result<()> {
1051        // Check for duplicate names
1052        let tools = self.tools.read().await;
1053        if tools.contains_key(&tool.definition.name) {
1054            return Err(P2PError::MCP(format!("Tool already exists: {}", tool.definition.name)).into());
1055        }
1056        
1057        // Validate tool name
1058        if tool.definition.name.is_empty() || tool.definition.name.len() > 100 {
1059            return Err(P2PError::MCP("Invalid tool name".to_string()).into());
1060        }
1061        
1062        // Validate schema
1063        if !tool.definition.input_schema.is_object() {
1064            return Err(P2PError::MCP("Tool input schema must be an object".to_string()).into());
1065        }
1066        
1067        Ok(())
1068    }
1069    
1070    /// Register tool in DHT for discovery
1071    async fn register_tool_in_dht(&self, tool_name: &str, dht: &Arc<RwLock<DHT>>) -> Result<()> {
1072        let key = Key::new(format!("mcp:tool:{}", tool_name).as_bytes());
1073        let service_info = json!({
1074            "tool_name": tool_name,
1075            "node_id": "local_node", // TODO: Get actual node ID
1076            "registered_at": SystemTime::now().duration_since(std::time::UNIX_EPOCH).map_err(|e| P2PError::Network(format!("Time error: {}", e)))?.as_secs(),
1077            "capabilities": self.get_server_capabilities().await
1078        });
1079        
1080        let dht_guard = dht.read().await;
1081        dht_guard.put(key, serde_json::to_vec(&service_info)?).await?;
1082        
1083        Ok(())
1084    }
1085    
1086    /// Get server capabilities
1087    async fn get_server_capabilities(&self) -> MCPCapabilities {
1088        MCPCapabilities {
1089            experimental: None,
1090            sampling: None,
1091            tools: Some(MCPToolsCapability {
1092                list_changed: Some(true),
1093            }),
1094            prompts: Some(MCPPromptsCapability {
1095                list_changed: Some(true),
1096            }),
1097            resources: Some(MCPResourcesCapability {
1098                subscribe: Some(true),
1099                list_changed: Some(true),
1100            }),
1101            logging: Some(MCPLoggingCapability {
1102                levels: Some(vec![
1103                    MCPLogLevel::Debug,
1104                    MCPLogLevel::Info,
1105                    MCPLogLevel::Warning,
1106                    MCPLogLevel::Error,
1107                ]),
1108            }),
1109        }
1110    }
1111    
1112    /// Call a tool by name
1113    pub async fn call_tool(&self, tool_name: &str, arguments: Value, context: MCPCallContext) -> Result<Value> {
1114        let start_time = Instant::now();
1115        
1116        // Security checks
1117        
1118        // 1. Check rate limiting
1119        if !self.check_rate_limit(&context.caller_id).await? {
1120            return Err(P2PError::MCP("Rate limit exceeded".to_string()));
1121        }
1122        
1123        // 2. Check tool execution permission
1124        if !self.check_permission(&context.caller_id, &MCPPermission::ExecuteTools).await? {
1125            return Err(P2PError::MCP("Permission denied: execute tools".to_string()));
1126        }
1127        
1128        // 3. Check tool-specific security policy
1129        let tool_security_level = self.get_tool_security_policy(tool_name).await;
1130        let is_trusted = self.is_trusted_peer(&context.caller_id).await;
1131        
1132        match tool_security_level {
1133            SecurityLevel::Admin => {
1134                if !self.check_permission(&context.caller_id, &MCPPermission::Admin).await? {
1135                    return Err(P2PError::MCP("Permission denied: admin access required".to_string()));
1136                }
1137            }
1138            SecurityLevel::Strong => {
1139                if !is_trusted {
1140                    return Err(P2PError::MCP("Permission denied: trusted peer required".to_string()));
1141                }
1142            }
1143            SecurityLevel::Basic => {
1144                // Check if authentication is enabled and token is valid
1145                if self.config.enable_auth {
1146                    if let Some(auth_info) = &context.auth_info {
1147                        self.verify_auth_token(&auth_info.token).await?;
1148                    } else {
1149                        return Err(P2PError::MCP("Authentication required".to_string()));
1150                    }
1151                }
1152            }
1153            SecurityLevel::Public => {
1154                // No additional checks needed
1155            }
1156        }
1157        
1158        // Log the tool call attempt
1159        let mut details = HashMap::new();
1160        details.insert("action".to_string(), "tool_call".to_string());
1161        details.insert("tool_name".to_string(), tool_name.to_string());
1162        details.insert("security_level".to_string(), format!("{:?}", tool_security_level));
1163        
1164        self.audit_logger.log_event(
1165            "tool_execution".to_string(),
1166            context.caller_id.clone(),
1167            details,
1168            AuditSeverity::Info,
1169        ).await;
1170        
1171        // Check if tool exists
1172        let tool_exists = {
1173            let tools = self.tools.read().await;
1174            tools.contains_key(tool_name)
1175        };
1176        
1177        if !tool_exists {
1178            return Err(P2PError::MCP(format!("Tool not found: {}", tool_name)).into());
1179        }
1180        
1181        // Validate arguments and get requirements
1182        let requirements = {
1183            let tools = self.tools.read().await;
1184            let tool = tools.get(tool_name).unwrap(); // Safe because we checked exists above
1185            
1186            // Validate arguments
1187            if let Err(e) = tool.handler.validate(&arguments) {
1188                return Err(P2PError::MCP(format!("Tool validation failed: {}", e)).into());
1189            }
1190            
1191            // Get resource requirements
1192            tool.handler.get_requirements()
1193        };
1194        
1195        // Check resource requirements
1196        self.check_resource_requirements(&requirements).await?;
1197        
1198        // Execute tool in a spawned task to avoid borrow checker issues
1199        let tools_clone = self.tools.clone();
1200        let tool_name_owned = tool_name.to_string();
1201        let execution_timeout = context.timeout.min(requirements.max_execution_time.unwrap_or(context.timeout));
1202        
1203        let result = timeout(execution_timeout, async move {
1204            let tools = tools_clone.read().await;
1205            let tool = tools.get(&tool_name_owned).unwrap(); // Safe because we checked exists above
1206            tool.handler.execute(arguments).await
1207        }).await
1208        .map_err(|_| P2PError::MCP("Tool execution timeout".to_string()))?
1209        .map_err(|e| P2PError::MCP(format!("Tool execution failed: {}", e)))?;
1210        
1211        let execution_time = start_time.elapsed();
1212        
1213        // Update tool statistics
1214        self.update_tool_stats(tool_name, execution_time, true).await;
1215        
1216        // Update server statistics
1217        {
1218            let mut stats = self.stats.write().await;
1219            stats.total_requests += 1;
1220            stats.total_responses += 1;
1221            
1222            // Update average response time
1223            let new_total_time = stats.avg_response_time.mul_f64(stats.total_responses as f64 - 1.0) + execution_time;
1224            stats.avg_response_time = new_total_time.div_f64(stats.total_responses as f64);
1225            
1226            // Update popular tools
1227            *stats.popular_tools.entry(tool_name.to_string()).or_insert(0) += 1;
1228        }
1229        
1230        debug!("Tool '{}' executed in {:?}", tool_name, execution_time);
1231        Ok(result)
1232    }
1233    
1234    /// Check if resource requirements can be met
1235    async fn check_resource_requirements(&self, requirements: &ToolRequirements) -> Result<()> {
1236        // Check memory limit
1237        if let Some(max_memory) = requirements.max_memory {
1238            if max_memory > self.config.tool_memory_limit {
1239                return Err(P2PError::MCP("Tool memory requirement exceeds limit".to_string()).into());
1240            }
1241        }
1242        
1243        // Check execution time limit
1244        if let Some(max_execution_time) = requirements.max_execution_time {
1245            if max_execution_time > self.config.max_tool_execution_time {
1246                return Err(P2PError::MCP("Tool execution time requirement exceeds limit".to_string()).into());
1247            }
1248        }
1249        
1250        // TODO: Check other requirements (capabilities, network, filesystem)
1251        
1252        Ok(())
1253    }
1254    
1255    /// Update tool execution statistics
1256    async fn update_tool_stats(&self, tool_name: &str, execution_time: Duration, success: bool) {
1257        let mut tools = self.tools.write().await;
1258        if let Some(tool) = tools.get_mut(tool_name) {
1259            tool.metadata.call_count += 1;
1260            tool.metadata.last_called = Some(SystemTime::now());
1261            
1262            // Update average execution time
1263            let new_total_time = tool.metadata.avg_execution_time.mul_f64(tool.metadata.call_count as f64 - 1.0) + execution_time;
1264            tool.metadata.avg_execution_time = new_total_time.div_f64(tool.metadata.call_count as f64);
1265            
1266            // Update health status based on success
1267            if !success {
1268                tool.metadata.health_status = match tool.metadata.health_status {
1269                    ToolHealthStatus::Healthy => ToolHealthStatus::Degraded,
1270                    ToolHealthStatus::Degraded => ToolHealthStatus::Unhealthy,
1271                    other => other,
1272                };
1273            } else if tool.metadata.health_status != ToolHealthStatus::Disabled {
1274                tool.metadata.health_status = ToolHealthStatus::Healthy;
1275            }
1276        }
1277    }
1278    
1279    /// List available tools
1280    pub async fn list_tools(&self, _cursor: Option<String>) -> Result<(Vec<MCPTool>, Option<String>)> {
1281        let tools = self.tools.read().await;
1282        let tool_definitions: Vec<MCPTool> = tools.values()
1283            .map(|tool| tool.definition.clone())
1284            .collect();
1285        
1286        // For simplicity, return all tools without pagination
1287        // In a real implementation, you'd implement proper cursor-based pagination
1288        Ok((tool_definitions, None))
1289    }
1290    
1291    /// Start request processing task
1292    async fn start_request_processor(&self) -> Result<()> {
1293        let _request_tx = self.request_tx.clone();
1294        let _server_clone = Arc::new(self);
1295        
1296        tokio::spawn(async move {
1297            info!("MCP request processor started");
1298            
1299            // In a real implementation, this would listen for incoming MCP requests
1300            // and process them through a receiver channel. For now, we'll implement
1301            // the message handling infrastructure without the actual network loop.
1302            
1303            loop {
1304                // Sleep to prevent busy loop - in real implementation,
1305                // this would block on receiving messages
1306                tokio::time::sleep(Duration::from_millis(100)).await;
1307                
1308                // Check if we should shutdown
1309                // This is a placeholder - real implementation would have proper shutdown signaling
1310                break;
1311            }
1312            
1313            info!("MCP request processor stopped");
1314        });
1315        
1316        Ok(())
1317    }
1318    
1319    /// Start service discovery task
1320    async fn start_service_discovery(&self) -> Result<()> {
1321        if let Some(dht) = self.dht.clone() {
1322            let _stats = self.stats.clone();
1323            let remote_services = self.remote_services.clone();
1324            
1325            tokio::spawn(async move {
1326                info!("MCP service discovery started");
1327                
1328                loop {
1329                    // Periodically discover services
1330                    tokio::time::sleep(SERVICE_DISCOVERY_INTERVAL).await;
1331                    
1332                    // Query DHT for MCP services
1333                    let key = Key::new(b"mcp:services");
1334                    let dht_guard = dht.read().await;
1335                    
1336                    match dht_guard.get(&key).await {
1337                        Some(record) => {
1338                            match serde_json::from_slice::<Vec<MCPService>>(&record.value) {
1339                                Ok(services) => {
1340                                    debug!("Discovered {} MCP services", services.len());
1341                                    
1342                                    // Update remote services cache
1343                                    {
1344                                        let mut remote_cache = remote_services.write().await;
1345                                        for service in services {
1346                                            remote_cache.insert(service.service_id.clone(), service);
1347                                        }
1348                                    }
1349                                }
1350                                Err(e) => {
1351                                    debug!("Failed to deserialize services: {}", e);
1352                                }
1353                            }
1354                        }
1355                        None => {
1356                            debug!("No MCP services found in DHT");
1357                        }
1358                    }
1359                }
1360            });
1361        }
1362        
1363        Ok(())
1364    }
1365    
1366    /// Start health monitoring task
1367    async fn start_health_monitor(&self) -> Result<()> {
1368        if !self.config.health_monitor.enabled {
1369            debug!("Health monitoring is disabled");
1370            return Ok(());
1371        }
1372
1373        info!("Starting health monitoring with interval: {:?}", self.config.health_monitor.health_check_interval);
1374        
1375        // Clone necessary fields for the background task
1376        let service_health = Arc::clone(&self.service_health);
1377        let remote_services = Arc::clone(&self.remote_services);
1378        let network_sender = Arc::clone(&self.network_sender);
1379        let health_event_tx = self.health_event_tx.clone();
1380        let config = self.config.health_monitor.clone();
1381        
1382        // Start health check task
1383        let health_check_task = {
1384            let service_health = Arc::clone(&service_health);
1385            let remote_services = Arc::clone(&remote_services);
1386            let network_sender = Arc::clone(&network_sender);
1387            let health_event_tx = health_event_tx.clone();
1388            let config = config.clone();
1389            
1390            tokio::spawn(async move {
1391                let mut interval = tokio::time::interval(config.health_check_interval);
1392                
1393                loop {
1394                    interval.tick().await;
1395                    
1396                    // Get list of remote services to check
1397                    let services_to_check: Vec<MCPService> = {
1398                        let remote_guard = remote_services.read().await;
1399                        remote_guard.values().cloned().collect()
1400                    };
1401                    
1402                    // Perform health checks on all remote services
1403                    for service in services_to_check {
1404                        if let Some(sender) = network_sender.read().await.as_ref() {
1405                            Self::perform_health_check(
1406                                &service,
1407                                sender.as_ref(),
1408                                &service_health,
1409                                &health_event_tx,
1410                                &config,
1411                            ).await;
1412                        }
1413                    }
1414                }
1415            })
1416        };
1417        
1418        // Start heartbeat task
1419        let heartbeat_task = {
1420            let network_sender = Arc::clone(&network_sender);
1421            let health_event_tx = health_event_tx.clone();
1422            let config = config.clone();
1423            
1424            tokio::spawn(async move {
1425                let mut interval = tokio::time::interval(config.heartbeat_interval);
1426                
1427                loop {
1428                    interval.tick().await;
1429                    
1430                    if let Some(sender) = network_sender.read().await.as_ref() {
1431                        Self::send_heartbeat(
1432                            sender.as_ref(),
1433                            &health_event_tx,
1434                        ).await;
1435                    }
1436                }
1437            })
1438        };
1439        
1440        // Start heartbeat timeout monitoring task
1441        let timeout_task = {
1442            let service_health = Arc::clone(&service_health);
1443            let health_event_tx = health_event_tx.clone();
1444            let config = config.clone();
1445            
1446            tokio::spawn(async move {
1447                let mut interval = tokio::time::interval(Duration::from_secs(30)); // Check every 30 seconds
1448                
1449                loop {
1450                    interval.tick().await;
1451                    
1452                    Self::check_heartbeat_timeouts(
1453                        &service_health,
1454                        &health_event_tx,
1455                        &config,
1456                    ).await;
1457                }
1458            })
1459        };
1460        
1461        // Store task handles (in a real implementation, you'd want to store these for cleanup)
1462        tokio::spawn(async move {
1463            tokio::select! {
1464                _ = health_check_task => debug!("Health check task completed"),
1465                _ = heartbeat_task => debug!("Heartbeat task completed"),
1466                _ = timeout_task => debug!("Timeout monitoring task completed"),
1467            }
1468        });
1469        
1470        info!("Health monitoring started successfully");
1471        Ok(())
1472    }
1473    
1474    /// Perform health check on a remote service
1475    async fn perform_health_check(
1476        service: &MCPService,
1477        network_sender: &dyn NetworkSender,
1478        service_health: &Arc<RwLock<HashMap<String, ServiceHealth>>>,
1479        health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1480        config: &HealthMonitorConfig,
1481    ) {
1482        let start_time = Instant::now();
1483        let service_id = service.service_id.clone();
1484        let peer_id = service.node_id.clone();
1485        
1486        // Create health check request using CallTool for now
1487        // TODO: Add proper health check message type to MCPMessage enum
1488        let health_check_message = MCPMessage::CallTool {
1489            name: "health_check".to_string(),
1490            arguments: json!({
1491                "service_id": service_id,
1492                "timestamp": SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()
1493            }),
1494        };
1495        
1496        // Serialize and send health check
1497        let result = match serde_json::to_vec(&health_check_message) {
1498            Ok(data) => {
1499                timeout(
1500                    config.health_check_timeout,
1501                    network_sender.send_message(&peer_id, MCP_PROTOCOL, data)
1502                ).await
1503            }
1504            Err(e) => {
1505                debug!("Failed to serialize health check message: {}", e);
1506                return;
1507            }
1508        };
1509        
1510        let response_time = start_time.elapsed();
1511        let success = result.is_ok() && result.unwrap().is_ok();
1512        
1513        // Update service health
1514        let mut health_guard = service_health.write().await;
1515        let health = health_guard.entry(service_id.clone()).or_insert_with(|| ServiceHealth {
1516            service_id: service_id.clone(),
1517            status: ServiceHealthStatus::Unknown,
1518            last_health_check: None,
1519            last_heartbeat: None,
1520            failure_count: 0,
1521            success_count: 0,
1522            avg_response_time: Duration::from_millis(0),
1523            error_message: None,
1524            health_history: Vec::new(),
1525        });
1526        
1527        // Record health check result
1528        let check_result = HealthCheckResult {
1529            timestamp: SystemTime::now(),
1530            success,
1531            response_time,
1532            error_message: if success { None } else { Some("Health check failed".to_string()) },
1533        };
1534        
1535        health.health_history.push(check_result);
1536        if health.health_history.len() > 10 {
1537            health.health_history.remove(0);
1538        }
1539        
1540        // Update counters and status
1541        let previous_status = health.status;
1542        if success {
1543            health.failure_count = 0;
1544            health.success_count += 1;
1545            health.last_health_check = Some(SystemTime::now());
1546            
1547            if health.success_count >= config.success_threshold {
1548                health.status = ServiceHealthStatus::Healthy;
1549                health.error_message = None;
1550            }
1551        } else {
1552            health.success_count = 0;
1553            health.failure_count += 1;
1554            
1555            if health.failure_count >= config.failure_threshold {
1556                health.status = ServiceHealthStatus::Unhealthy;
1557                health.error_message = Some("Health check failures exceeded threshold".to_string());
1558            }
1559        }
1560        
1561        // Update average response time
1562        let total_time: Duration = health.health_history.iter().map(|h| h.response_time).sum();
1563        health.avg_response_time = total_time / health.health_history.len() as u32;
1564        
1565        // Send health event if status changed
1566        if previous_status != health.status {
1567            let event = match health.status {
1568                ServiceHealthStatus::Healthy => HealthEvent::ServiceHealthy {
1569                    service_id: service_id.clone(),
1570                    peer_id: peer_id.clone(),
1571                },
1572                ServiceHealthStatus::Unhealthy => HealthEvent::ServiceUnhealthy {
1573                    service_id: service_id.clone(),
1574                    peer_id: peer_id.clone(),
1575                    error: health.error_message.clone().unwrap_or_else(|| "Unknown error".to_string()),
1576                },
1577                ServiceHealthStatus::Degraded => HealthEvent::ServiceDegraded {
1578                    service_id: service_id.clone(),
1579                    peer_id: peer_id.clone(),
1580                    reason: "Performance degradation detected".to_string(),
1581                },
1582                _ => return, // No event for other statuses
1583            };
1584            
1585            if let Err(e) = health_event_tx.send(event) {
1586                debug!("Failed to send health event: {}", e);
1587            }
1588        }
1589    }
1590    
1591    /// Send heartbeat to announce service availability
1592    async fn send_heartbeat(
1593        network_sender: &dyn NetworkSender,
1594        health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1595    ) {
1596        let heartbeat = Heartbeat {
1597            service_id: "mcp-server".to_string(),
1598            peer_id: network_sender.local_peer_id().clone(),
1599            timestamp: SystemTime::now(),
1600            load: 0.1, // TODO: Calculate actual load
1601            available_tools: vec![], // TODO: Get actual tools
1602            capabilities: MCPCapabilities {
1603                experimental: None,
1604                sampling: None,
1605                tools: Some(MCPToolsCapability { list_changed: Some(true) }),
1606                prompts: None,
1607                resources: None,
1608                logging: None,
1609            },
1610        };
1611        
1612        // Use CallTool for heartbeat until proper notification type is added
1613        let heartbeat_message = MCPMessage::CallTool {
1614            name: "heartbeat".to_string(),
1615            arguments: serde_json::to_value(&heartbeat).unwrap_or(json!({})),
1616        };
1617        
1618        if let Ok(data) = serde_json::to_vec(&heartbeat_message) {
1619            // Broadcast heartbeat to all known peers (in a real implementation)
1620            // For now, we'll just log the heartbeat
1621            debug!("Sending heartbeat for service: {}", heartbeat.service_id);
1622            
1623            // Send heartbeat event
1624            let event = HealthEvent::HeartbeatReceived {
1625                service_id: heartbeat.service_id.clone(),
1626                peer_id: heartbeat.peer_id.clone(),
1627                load: heartbeat.load,
1628            };
1629            
1630            if let Err(e) = health_event_tx.send(event) {
1631                debug!("Failed to send heartbeat event: {}", e);
1632            }
1633        }
1634    }
1635    
1636    /// Check for heartbeat timeouts and mark services as unhealthy
1637    async fn check_heartbeat_timeouts(
1638        service_health: &Arc<RwLock<HashMap<String, ServiceHealth>>>,
1639        health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1640        config: &HealthMonitorConfig,
1641    ) {
1642        let now = SystemTime::now();
1643        let mut health_guard = service_health.write().await;
1644        
1645        for (service_id, health) in health_guard.iter_mut() {
1646            if let Some(last_heartbeat) = health.last_heartbeat {
1647                if let Ok(duration) = now.duration_since(last_heartbeat) {
1648                    if duration > config.heartbeat_timeout {
1649                        let previous_status = health.status;
1650                        health.status = ServiceHealthStatus::Unhealthy;
1651                        health.error_message = Some("Heartbeat timeout".to_string());
1652                        
1653                        // Send timeout event if status changed
1654                        if previous_status != ServiceHealthStatus::Unhealthy {
1655                            let event = HealthEvent::HeartbeatTimeout {
1656                                service_id: service_id.clone(),
1657                                peer_id: PeerId::from("unknown".to_string()), // TODO: Store peer ID in health
1658                            };
1659                            
1660                            if let Err(e) = health_event_tx.send(event) {
1661                                debug!("Failed to send timeout event: {}", e);
1662                            }
1663                        }
1664                    }
1665                }
1666            }
1667        }
1668    }
1669    
1670    /// Get server statistics
1671    pub async fn get_stats(&self) -> MCPServerStats {
1672        self.stats.read().await.clone()
1673    }
1674    
1675    /// Handle incoming heartbeat from a remote service
1676    pub async fn handle_heartbeat(&self, heartbeat: Heartbeat) -> Result<()> {
1677        let service_id = heartbeat.service_id.clone();
1678        let peer_id = heartbeat.peer_id.clone();
1679        
1680        // Update service health with heartbeat information
1681        {
1682            let mut health_guard = self.service_health.write().await;
1683            let health = health_guard.entry(service_id.clone()).or_insert_with(|| ServiceHealth {
1684                service_id: service_id.clone(),
1685                status: ServiceHealthStatus::Healthy,
1686                last_health_check: None,
1687                last_heartbeat: None,
1688                failure_count: 0,
1689                success_count: 0,
1690                avg_response_time: Duration::from_millis(0),
1691                error_message: None,
1692                health_history: Vec::new(),
1693            });
1694            
1695            health.last_heartbeat = Some(heartbeat.timestamp);
1696            health.status = ServiceHealthStatus::Healthy;
1697            health.failure_count = 0;
1698            health.error_message = None;
1699        }
1700        
1701        // Send heartbeat received event
1702        let event = HealthEvent::HeartbeatReceived {
1703            service_id,
1704            peer_id,
1705            load: heartbeat.load,
1706        };
1707        
1708        if let Err(e) = self.health_event_tx.send(event) {
1709            debug!("Failed to send heartbeat received event: {}", e);
1710        }
1711        
1712        info!("Heartbeat received from service: {} (load: {:.2})", heartbeat.service_id, heartbeat.load);
1713        Ok(())
1714    }
1715    
1716    /// Get health status of a specific service
1717    pub async fn get_service_health(&self, service_id: &str) -> Option<ServiceHealth> {
1718        let health_guard = self.service_health.read().await;
1719        health_guard.get(service_id).cloned()
1720    }
1721    
1722    /// Get health status of all services
1723    pub async fn get_all_service_health(&self) -> HashMap<String, ServiceHealth> {
1724        self.service_health.read().await.clone()
1725    }
1726    
1727    /// Get healthy services only
1728    pub async fn get_healthy_services(&self) -> Vec<String> {
1729        let health_guard = self.service_health.read().await;
1730        health_guard
1731            .iter()
1732            .filter(|(_, health)| health.status == ServiceHealthStatus::Healthy)
1733            .map(|(service_id, _)| service_id.clone())
1734            .collect()
1735    }
1736    
1737    /// Update service health status manually
1738    pub async fn update_service_health(&self, service_id: String, status: ServiceHealthStatus, error_message: Option<String>) {
1739        let mut health_guard = self.service_health.write().await;
1740        if let Some(health) = health_guard.get_mut(&service_id) {
1741            let previous_status = health.status;
1742            health.status = status;
1743            health.error_message = error_message.clone();
1744            
1745            // Send event if status changed
1746            if previous_status != status {
1747                let event = match status {
1748                    ServiceHealthStatus::Healthy => HealthEvent::ServiceHealthy {
1749                        service_id: service_id.clone(),
1750                        peer_id: PeerId::from("manual".to_string()),
1751                    },
1752                    ServiceHealthStatus::Unhealthy => HealthEvent::ServiceUnhealthy {
1753                        service_id: service_id.clone(),
1754                        peer_id: PeerId::from("manual".to_string()),
1755                        error: error_message.unwrap_or_else(|| "Manually set to unhealthy".to_string()),
1756                    },
1757                    ServiceHealthStatus::Degraded => HealthEvent::ServiceDegraded {
1758                        service_id: service_id.clone(),
1759                        peer_id: PeerId::from("manual".to_string()),
1760                        reason: "Manually set to degraded".to_string(),
1761                    },
1762                    _ => return,
1763                };
1764                
1765                if let Err(e) = self.health_event_tx.send(event) {
1766                    debug!("Failed to send manual health update event: {}", e);
1767                }
1768            }
1769        }
1770    }
1771    
1772    /// Subscribe to health events
1773    pub fn subscribe_health_events(&self) -> mpsc::UnboundedReceiver<HealthEvent> {
1774        // In a real implementation, you'd want multiple subscribers
1775        // For now, we create a new channel
1776        let (_tx, rx) = mpsc::unbounded_channel();
1777        rx
1778    }
1779    
1780    /// Check if a service is healthy
1781    pub async fn is_service_healthy(&self, service_id: &str) -> bool {
1782        if let Some(health) = self.get_service_health(service_id).await {
1783            health.status == ServiceHealthStatus::Healthy
1784        } else {
1785            false
1786        }
1787    }
1788    
1789    /// Get service load balancing information
1790    pub async fn get_service_load_info(&self) -> HashMap<String, f32> {
1791        // In a real implementation, this would return actual load metrics
1792        // For now, return mock data based on health status
1793        let health_guard = self.service_health.read().await;
1794        health_guard
1795            .iter()
1796            .map(|(service_id, health)| {
1797                let load = match health.status {
1798                    ServiceHealthStatus::Healthy => 0.1,
1799                    ServiceHealthStatus::Degraded => 0.7,
1800                    ServiceHealthStatus::Unhealthy => 1.0,
1801                    ServiceHealthStatus::Disabled => 0.0,
1802                    ServiceHealthStatus::Unknown => 0.5,
1803                };
1804                (service_id.clone(), load)
1805            })
1806            .collect()
1807    }
1808    
1809    
1810    /// Call a tool on a remote node
1811    pub async fn call_remote_tool(&self, peer_id: &PeerId, tool_name: &str, arguments: Value, context: MCPCallContext) -> Result<Value> {
1812        let request_id = uuid::Uuid::new_v4().to_string();
1813        
1814        // Create MCP call tool message
1815        let mcp_message = MCPMessage::CallTool {
1816            name: tool_name.to_string(),
1817            arguments,
1818        };
1819        
1820        // Create P2P message wrapper
1821        let p2p_message = P2PMCPMessage {
1822            message_type: P2PMCPMessageType::Request,
1823            message_id: request_id.clone(),
1824            source_peer: context.caller_id.clone(),
1825            target_peer: Some(peer_id.clone()),
1826            timestamp: SystemTime::now()
1827                .duration_since(std::time::UNIX_EPOCH)
1828                .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1829                .as_secs(),
1830            payload: mcp_message,
1831            ttl: 5, // Max 5 hops
1832        };
1833        
1834        // Serialize the message
1835        let message_data = serde_json::to_vec(&p2p_message)
1836            .map_err(|e| P2PError::Serialization(e))?;
1837        
1838        if message_data.len() > MAX_MESSAGE_SIZE {
1839            return Err(P2PError::MCP("Message too large".to_string()));
1840        }
1841        
1842        // Create response channel
1843        let (response_tx, _response_rx) = oneshot::channel::<MCPResponse>();
1844        
1845        // Store response handler
1846        {
1847            let mut handlers = self.request_handlers.write().await;
1848            handlers.insert(request_id.clone(), response_tx);
1849        }
1850        
1851        // Send via P2P network
1852        if let Some(ref network_sender) = *self.network_sender.read().await {
1853            // Send the message to the target peer
1854            network_sender.send_message(peer_id, MCP_PROTOCOL, message_data).await?;
1855            
1856            // Wait for response (simplified - in production this would be more sophisticated)
1857            // For now, return a placeholder response indicating successful sending
1858            debug!("MCP remote tool call sent to peer {}, tool: {}", peer_id, tool_name);
1859            
1860            // TODO: Implement proper response waiting mechanism
1861            // This would involve storing the request_id and waiting for a matching response
1862            Ok(json!({
1863                "status": "sent",
1864                "message": "Remote tool call sent successfully",
1865                "peer_id": peer_id,
1866                "tool_name": tool_name
1867            }))
1868        } else {
1869            Err(P2PError::MCP("Network sender not configured".to_string()))
1870        }
1871    }
1872    
1873    /// Handle incoming P2P MCP message
1874    pub async fn handle_p2p_message(&self, message_data: &[u8], source_peer: &PeerId) -> Result<Option<Vec<u8>>> {
1875        // Deserialize the P2P message
1876        let p2p_message: P2PMCPMessage = serde_json::from_slice(message_data)
1877            .map_err(|e| P2PError::Serialization(e))?;
1878        
1879        debug!("Received MCP message from {}: {:?}", source_peer, p2p_message.message_type);
1880        
1881        // Check if this is a heartbeat or health check
1882        if let MCPMessage::CallTool { name, arguments } = &p2p_message.payload {
1883            if name == "heartbeat" {
1884                if let Ok(heartbeat) = serde_json::from_value::<Heartbeat>(arguments.clone()) {
1885                    self.handle_heartbeat(heartbeat).await?;
1886                    return Ok(None);
1887                }
1888            } else if name == "health_check" {
1889                // Respond to health check
1890                let health_response = MCPMessage::CallToolResult {
1891                    content: vec![],
1892                    is_error: false,
1893                };
1894                
1895                let response_message = P2PMCPMessage {
1896                    message_type: P2PMCPMessageType::Response,
1897                    message_id: p2p_message.message_id.clone(),
1898                    source_peer: source_peer.clone(),
1899                    target_peer: Some(p2p_message.source_peer.clone()),
1900                    timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(),
1901                    payload: health_response,
1902                    ttl: 3,
1903                };
1904                
1905                return Ok(Some(serde_json::to_vec(&response_message)?));
1906            }
1907        }
1908        
1909        match p2p_message.message_type {
1910            P2PMCPMessageType::Request => {
1911                self.handle_remote_request(p2p_message).await
1912            }
1913            P2PMCPMessageType::Response => {
1914                self.handle_remote_response(p2p_message).await?;
1915                Ok(None) // Responses don't generate replies
1916            }
1917            P2PMCPMessageType::ServiceAdvertisement => {
1918                self.handle_service_advertisement(p2p_message).await?;
1919                Ok(None)
1920            }
1921            P2PMCPMessageType::ServiceDiscovery => {
1922                self.handle_service_discovery(p2p_message).await
1923            }
1924            P2PMCPMessageType::Heartbeat => {
1925                debug!("Received heartbeat message");
1926                Ok(None)
1927            }
1928            P2PMCPMessageType::HealthCheck => {
1929                debug!("Received health check message");
1930                Ok(None)
1931            }
1932        }
1933    }
1934    
1935    /// Handle remote tool call request
1936    async fn handle_remote_request(&self, message: P2PMCPMessage) -> Result<Option<Vec<u8>>> {
1937        match message.payload {
1938            MCPMessage::CallTool { name, arguments } => {
1939                let context = MCPCallContext {
1940                    caller_id: message.source_peer.clone(),
1941                    timestamp: SystemTime::now(),
1942                    timeout: DEFAULT_CALL_TIMEOUT,
1943                    auth_info: None,
1944                    metadata: HashMap::new(),
1945                };
1946                
1947                // Call the local tool
1948                let result = self.call_tool(&name, arguments, context).await;
1949                
1950                // Create response message
1951                let response_payload = match result {
1952                    Ok(value) => MCPMessage::CallToolResult {
1953                        content: vec![MCPContent::Text { text: value.to_string() }],
1954                        is_error: false,
1955                    },
1956                    Err(e) => MCPMessage::Error {
1957                        code: -1,
1958                        message: e.to_string(),
1959                        data: None,
1960                    },
1961                };
1962                
1963                let response_message = P2PMCPMessage {
1964                    message_type: P2PMCPMessageType::Response,
1965                    message_id: message.message_id,
1966                    source_peer: "local".to_string(), // TODO: Get actual local peer ID
1967                    target_peer: Some(message.source_peer),
1968                    timestamp: SystemTime::now()
1969                        .duration_since(std::time::UNIX_EPOCH)
1970                        .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1971                        .as_secs(),
1972                    payload: response_payload,
1973                    ttl: message.ttl.saturating_sub(1),
1974                };
1975                
1976                // Serialize response
1977                let response_data = serde_json::to_vec(&response_message)
1978                    .map_err(|e| P2PError::Serialization(e))?;
1979                
1980                Ok(Some(response_data))
1981            }
1982            MCPMessage::ListTools { cursor: _ } => {
1983                let (tools, _) = self.list_tools(None).await?;
1984                
1985                let response_payload = MCPMessage::ListToolsResult {
1986                    tools,
1987                    next_cursor: None,
1988                };
1989                
1990                let response_message = P2PMCPMessage {
1991                    message_type: P2PMCPMessageType::Response,
1992                    message_id: message.message_id,
1993                    source_peer: "local".to_string(), // TODO: Get actual local peer ID
1994                    target_peer: Some(message.source_peer),
1995                    timestamp: SystemTime::now()
1996                        .duration_since(std::time::UNIX_EPOCH)
1997                        .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1998                        .as_secs(),
1999                    payload: response_payload,
2000                    ttl: message.ttl.saturating_sub(1),
2001                };
2002                
2003                let response_data = serde_json::to_vec(&response_message)
2004                    .map_err(|e| P2PError::Serialization(e))?;
2005                
2006                Ok(Some(response_data))
2007            }
2008            _ => {
2009                // Unsupported request type
2010                let error_response = P2PMCPMessage {
2011                    message_type: P2PMCPMessageType::Response,
2012                    message_id: message.message_id,
2013                    source_peer: "local".to_string(), // TODO: Get actual local peer ID
2014                    target_peer: Some(message.source_peer),
2015                    timestamp: SystemTime::now()
2016                        .duration_since(std::time::UNIX_EPOCH)
2017                        .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
2018                        .as_secs(),
2019                    payload: MCPMessage::Error {
2020                        code: -2,
2021                        message: "Unsupported request type".to_string(),
2022                        data: None,
2023                    },
2024                    ttl: message.ttl.saturating_sub(1),
2025                };
2026                
2027                let response_data = serde_json::to_vec(&error_response)
2028                    .map_err(|e| P2PError::Serialization(e))?;
2029                
2030                Ok(Some(response_data))
2031            }
2032        }
2033    }
2034    
2035    // Security-related methods
2036    
2037    /// Generate authentication token for peer
2038    pub async fn generate_auth_token(&self, peer_id: &PeerId, permissions: Vec<MCPPermission>, ttl: Duration) -> Result<String> {
2039        if let Some(security_manager) = &self.security_manager {
2040            let token = security_manager.generate_token(peer_id, permissions, ttl).await?;
2041            
2042            // Log authentication event
2043            let mut details = HashMap::new();
2044            details.insert("action".to_string(), "token_generated".to_string());
2045            details.insert("ttl_seconds".to_string(), ttl.as_secs().to_string());
2046            
2047            self.audit_logger.log_event(
2048                "authentication".to_string(),
2049                peer_id.clone(),
2050                details,
2051                AuditSeverity::Info,
2052            ).await;
2053            
2054            Ok(token)
2055        } else {
2056            Err(P2PError::MCP("Authentication not enabled".to_string()))
2057        }
2058    }
2059    
2060    /// Verify authentication token
2061    pub async fn verify_auth_token(&self, token: &str) -> Result<TokenPayload> {
2062        if let Some(security_manager) = &self.security_manager {
2063            match security_manager.verify_token(token).await {
2064                Ok(payload) => {
2065                    // Log successful verification
2066                    let mut details = HashMap::new();
2067                    details.insert("action".to_string(), "token_verified".to_string());
2068                    details.insert("subject".to_string(), payload.sub.clone());
2069                    
2070                    self.audit_logger.log_event(
2071                        "authentication".to_string(),
2072                        payload.iss.clone(),
2073                        details,
2074                        AuditSeverity::Info,
2075                    ).await;
2076                    
2077                    Ok(payload)
2078                }
2079                Err(e) => {
2080                    // Log failed verification
2081                    let mut details = HashMap::new();
2082                    details.insert("action".to_string(), "token_verification_failed".to_string());
2083                    details.insert("error".to_string(), e.to_string());
2084                    
2085                    self.audit_logger.log_event(
2086                        "authentication".to_string(),
2087                        "unknown".to_string(),
2088                        details,
2089                        AuditSeverity::Warning,
2090                    ).await;
2091                    
2092                    Err(e)
2093                }
2094            }
2095        } else {
2096            Err(P2PError::MCP("Authentication not enabled".to_string()))
2097        }
2098    }
2099    
2100    /// Check if peer has permission for operation
2101    pub async fn check_permission(&self, peer_id: &PeerId, permission: &MCPPermission) -> Result<bool> {
2102        if let Some(security_manager) = &self.security_manager {
2103            security_manager.check_permission(peer_id, permission).await
2104        } else {
2105            // If security is disabled, allow all operations
2106            Ok(true)
2107        }
2108    }
2109    
2110    /// Check rate limit for peer
2111    pub async fn check_rate_limit(&self, peer_id: &PeerId) -> Result<bool> {
2112        if let Some(security_manager) = &self.security_manager {
2113            let allowed = security_manager.check_rate_limit(peer_id).await?;
2114            
2115            if !allowed {
2116                // Log rate limit violation
2117                let mut details = HashMap::new();
2118                details.insert("action".to_string(), "rate_limit_exceeded".to_string());
2119                
2120                self.audit_logger.log_event(
2121                    "rate_limiting".to_string(),
2122                    peer_id.clone(),
2123                    details,
2124                    AuditSeverity::Warning,
2125                ).await;
2126            }
2127            
2128            Ok(allowed)
2129        } else {
2130            // If rate limiting is disabled, allow all requests
2131            Ok(true)
2132        }
2133    }
2134    
2135    /// Grant permission to peer
2136    pub async fn grant_permission(&self, peer_id: &PeerId, permission: MCPPermission) -> Result<()> {
2137        if let Some(security_manager) = &self.security_manager {
2138            security_manager.grant_permission(peer_id, permission.clone()).await?;
2139            
2140            // Log permission grant
2141            let mut details = HashMap::new();
2142            details.insert("action".to_string(), "permission_granted".to_string());
2143            details.insert("permission".to_string(), permission.as_str().to_string());
2144            
2145            self.audit_logger.log_event(
2146                "authorization".to_string(),
2147                peer_id.clone(),
2148                details,
2149                AuditSeverity::Info,
2150            ).await;
2151            
2152            Ok(())
2153        } else {
2154            Err(P2PError::MCP("Security not enabled".to_string()))
2155        }
2156    }
2157    
2158    /// Revoke permission from peer
2159    pub async fn revoke_permission(&self, peer_id: &PeerId, permission: &MCPPermission) -> Result<()> {
2160        if let Some(security_manager) = &self.security_manager {
2161            security_manager.revoke_permission(peer_id, permission).await?;
2162            
2163            // Log permission revocation
2164            let mut details = HashMap::new();
2165            details.insert("action".to_string(), "permission_revoked".to_string());
2166            details.insert("permission".to_string(), permission.as_str().to_string());
2167            
2168            self.audit_logger.log_event(
2169                "authorization".to_string(),
2170                peer_id.clone(),
2171                details,
2172                AuditSeverity::Info,
2173            ).await;
2174            
2175            Ok(())
2176        } else {
2177            Err(P2PError::MCP("Security not enabled".to_string()))
2178        }
2179    }
2180    
2181    /// Add trusted peer
2182    pub async fn add_trusted_peer(&self, peer_id: PeerId) -> Result<()> {
2183        if let Some(security_manager) = &self.security_manager {
2184            security_manager.add_trusted_peer(peer_id.clone()).await?;
2185            
2186            // Log trusted peer addition
2187            let mut details = HashMap::new();
2188            details.insert("action".to_string(), "trusted_peer_added".to_string());
2189            
2190            self.audit_logger.log_event(
2191                "trust_management".to_string(),
2192                peer_id,
2193                details,
2194                AuditSeverity::Info,
2195            ).await;
2196            
2197            Ok(())
2198        } else {
2199            Err(P2PError::MCP("Security not enabled".to_string()))
2200        }
2201    }
2202    
2203    /// Check if peer is trusted
2204    pub async fn is_trusted_peer(&self, peer_id: &PeerId) -> bool {
2205        if let Some(security_manager) = &self.security_manager {
2206            security_manager.is_trusted_peer(peer_id).await
2207        } else {
2208            false
2209        }
2210    }
2211    
2212    /// Set security policy for tool
2213    pub async fn set_tool_security_policy(&self, tool_name: String, level: SecurityLevel) -> Result<()> {
2214        if let Some(security_manager) = &self.security_manager {
2215            security_manager.set_tool_policy(tool_name.clone(), level.clone()).await?;
2216            
2217            // Log policy change
2218            let mut details = HashMap::new();
2219            details.insert("action".to_string(), "tool_policy_set".to_string());
2220            details.insert("tool_name".to_string(), tool_name);
2221            details.insert("security_level".to_string(), format!("{:?}", level));
2222            
2223            self.audit_logger.log_event(
2224                "security_policy".to_string(),
2225                "system".to_string(),
2226                details,
2227                AuditSeverity::Info,
2228            ).await;
2229            
2230            Ok(())
2231        } else {
2232            Err(P2PError::MCP("Security not enabled".to_string()))
2233        }
2234    }
2235    
2236    /// Get security policy for tool
2237    pub async fn get_tool_security_policy(&self, tool_name: &str) -> SecurityLevel {
2238        if let Some(security_manager) = &self.security_manager {
2239            security_manager.get_tool_policy(tool_name).await
2240        } else {
2241            SecurityLevel::Public
2242        }
2243    }
2244    
2245    /// Get peer security statistics
2246    pub async fn get_peer_security_stats(&self, peer_id: &PeerId) -> Option<PeerACL> {
2247        if let Some(security_manager) = &self.security_manager {
2248            security_manager.get_peer_stats(peer_id).await
2249        } else {
2250            None
2251        }
2252    }
2253    
2254    /// Get recent security audit entries
2255    pub async fn get_security_audit(&self, limit: Option<usize>) -> Vec<SecurityAuditEntry> {
2256        self.audit_logger.get_recent_entries(limit).await
2257    }
2258    
2259    /// Perform security housekeeping
2260    pub async fn security_cleanup(&self) -> Result<()> {
2261        if let Some(security_manager) = &self.security_manager {
2262            security_manager.cleanup().await?;
2263        }
2264        Ok(())
2265    }
2266    
2267    /// Handle remote response
2268    async fn handle_remote_response(&self, message: P2PMCPMessage) -> Result<()> {
2269        // Find the waiting request handler
2270        let response_tx = {
2271            let mut handlers = self.request_handlers.write().await;
2272            handlers.remove(&message.message_id)
2273        };
2274        
2275        if let Some(tx) = response_tx {
2276            let response = MCPResponse {
2277                request_id: message.message_id,
2278                message: message.payload,
2279                timestamp: SystemTime::now(),
2280                processing_time: Duration::from_millis(0), // TODO: Calculate actual processing time
2281            };
2282            
2283            // Send response to waiting caller
2284            let _ = tx.send(response);
2285        } else {
2286            debug!("Received response for unknown request: {}", message.message_id);
2287        }
2288        
2289        Ok(())
2290    }
2291    
2292    /// Announce local services to the network
2293    pub async fn announce_local_services(&self) -> Result<()> {
2294        if let Some(dht) = &self.dht {
2295            // Create local service announcement
2296            let local_service = self.create_local_service_announcement().await?;
2297            
2298            // Store in DHT
2299            self.store_service_in_dht(&local_service, dht).await?;
2300            
2301            // Broadcast service announcement to connected peers
2302            if let Some(network_sender) = &*self.network_sender.read().await {
2303                self.broadcast_service_announcement(&local_service, network_sender).await?;
2304            }
2305            
2306            info!("Announced local MCP service with {} tools", local_service.tools.len());
2307        }
2308        
2309        Ok(())
2310    }
2311    
2312    /// Create a service announcement for our local node
2313    async fn create_local_service_announcement(&self) -> Result<MCPService> {
2314        let tools = self.tools.read().await;
2315        let tool_names: Vec<String> = tools.keys().cloned().collect();
2316        
2317        let service = MCPService {
2318            service_id: format!("mcp-{}", self.config.server_name),
2319            node_id: "local".to_string(), // TODO: Get actual peer ID from network layer
2320            tools: tool_names,
2321            capabilities: MCPCapabilities {
2322                experimental: None,
2323                sampling: None,
2324                tools: Some(MCPToolsCapability {
2325                    list_changed: Some(true),
2326                }),
2327                prompts: None,
2328                resources: None,
2329                logging: None,
2330            },
2331            metadata: MCPServiceMetadata {
2332                name: self.config.server_name.clone(),
2333                version: self.config.server_version.clone(),
2334                description: Some("P2P MCP Service".to_string()),
2335                tags: vec!["p2p".to_string(), "mcp".to_string()],
2336                health_status: ServiceHealthStatus::Healthy,
2337                load_metrics: self.get_current_load_metrics().await,
2338            },
2339            registered_at: SystemTime::now(),
2340            endpoint: MCPEndpoint {
2341                protocol: "p2p".to_string(),
2342                address: "local".to_string(), // TODO: Get actual P2P address
2343                port: None,
2344                tls: true,
2345                auth_required: false,
2346            },
2347        };
2348        
2349        Ok(service)
2350    }
2351    
2352    /// Get current service load metrics
2353    async fn get_current_load_metrics(&self) -> ServiceLoadMetrics {
2354        let stats = self.stats.read().await;
2355        
2356        ServiceLoadMetrics {
2357            active_requests: 0, // TODO: Track active requests
2358            requests_per_second: stats.total_requests as f64 / 60.0, // Rough estimate
2359            avg_response_time_ms: stats.avg_response_time.as_millis() as f64,
2360            error_rate: if stats.total_requests > 0 {
2361                stats.total_errors as f64 / stats.total_requests as f64
2362            } else {
2363                0.0
2364            },
2365            cpu_usage: 0.0, // TODO: Get actual CPU usage
2366            memory_usage: 0, // TODO: Get actual memory usage
2367        }
2368    }
2369    
2370    /// Store service information in DHT
2371    async fn store_service_in_dht(&self, service: &MCPService, dht: &Arc<RwLock<DHT>>) -> Result<()> {
2372        // Store individual service record
2373        let service_key = Key::new(format!("mcp:service:{}", service.service_id).as_bytes());
2374        let service_data = serde_json::to_vec(service)
2375            .map_err(|e| P2PError::Serialization(e))?;
2376        
2377        let mut dht_guard = dht.write().await;
2378        dht_guard.put(service_key.clone(), service_data).await
2379            .map_err(|e| P2PError::DHT(format!("Failed to store service in DHT: {}", e)))?;
2380        
2381        // Also add to services index
2382        let services_key = Key::new(b"mcp:services:index");
2383        let mut service_ids = match dht_guard.get(&services_key).await {
2384            Some(record) => {
2385                serde_json::from_slice::<Vec<String>>(&record.value).unwrap_or_default()
2386            }
2387            None => Vec::new(),
2388        };
2389        
2390        if !service_ids.contains(&service.service_id) {
2391            service_ids.push(service.service_id.clone());
2392            
2393            let index_data = serde_json::to_vec(&service_ids)
2394                .map_err(|e| P2PError::Serialization(e))?;
2395            
2396            dht_guard.put(services_key, index_data).await
2397                .map_err(|e| P2PError::DHT(format!("Failed to update services index: {}", e)))?;
2398        }
2399        
2400        Ok(())
2401    }
2402    
2403    /// Broadcast service announcement to connected peers
2404    async fn broadcast_service_announcement(&self, service: &MCPService, network_sender: &Arc<dyn NetworkSender>) -> Result<()> {
2405        let announcement = P2PMCPMessage {
2406            message_type: P2PMCPMessageType::ServiceAdvertisement,
2407            message_id: uuid::Uuid::new_v4().to_string(),
2408            source_peer: network_sender.local_peer_id().clone(),
2409            target_peer: None, // Broadcast to all peers
2410            timestamp: SystemTime::now()
2411                .duration_since(std::time::UNIX_EPOCH)
2412                .unwrap_or_default()
2413                .as_secs(),
2414            payload: MCPMessage::ListToolsResult {
2415                tools: service.tools.iter().map(|tool_name| MCPTool {
2416                    name: tool_name.clone(),
2417                    description: format!("Tool from {}", service.metadata.name),
2418                    input_schema: json!({"type": "object"}),
2419                }).collect(),
2420                next_cursor: None,
2421            },
2422            ttl: 3,
2423        };
2424        
2425        let announcement_data = serde_json::to_vec(&announcement)
2426            .map_err(|e| P2PError::Serialization(e))?;
2427        
2428        // TODO: Broadcast to all connected peers
2429        // For now, this would require getting the list of connected peers from the network layer
2430        debug!("Service announcement prepared for broadcast: {} tools", service.tools.len());
2431        
2432        Ok(())
2433    }
2434    
2435    /// Discover services from other peers
2436    pub async fn discover_remote_services(&self) -> Result<Vec<MCPService>> {
2437        if let Some(dht) = &self.dht {
2438            let services_key = Key::new(b"mcp:services:index");
2439            let dht_guard = dht.read().await;
2440            
2441            let service_ids = match dht_guard.get(&services_key).await {
2442                Some(record) => {
2443                    serde_json::from_slice::<Vec<String>>(&record.value).unwrap_or_default()
2444                }
2445                None => {
2446                    debug!("No services index found in DHT");
2447                    return Ok(Vec::new());
2448                }
2449            };
2450            
2451            let mut discovered_services = Vec::new();
2452            
2453            for service_id in service_ids {
2454                let service_key = Key::new(format!("mcp:service:{}", service_id).as_bytes());
2455                
2456                if let Some(record) = dht_guard.get(&service_key).await {
2457                    match serde_json::from_slice::<MCPService>(&record.value) {
2458                        Ok(service) => {
2459                            // Don't include our own service
2460                            if service.service_id != format!("mcp-{}", self.config.server_name) {
2461                                discovered_services.push(service);
2462                            }
2463                        }
2464                        Err(e) => {
2465                            warn!("Failed to deserialize service {}: {}", service_id, e);
2466                        }
2467                    }
2468                }
2469            }
2470            
2471            debug!("Discovered {} remote MCP services", discovered_services.len());
2472            Ok(discovered_services)
2473        } else {
2474            Ok(Vec::new())
2475        }
2476    }
2477    
2478    /// Refresh service discovery and update remote services cache
2479    pub async fn refresh_service_discovery(&self) -> Result<()> {
2480        let discovered_services = self.discover_remote_services().await?;
2481        
2482        // Update remote services cache
2483        {
2484            let mut remote_cache = self.remote_services.write().await;
2485            remote_cache.clear();
2486            
2487            for service in discovered_services {
2488                remote_cache.insert(service.service_id.clone(), service);
2489            }
2490        }
2491        
2492        // Also update local services registry
2493        {
2494            let local_service = self.create_local_service_announcement().await?;
2495            let mut local_cache = self.local_services.write().await;
2496            local_cache.insert(local_service.service_id.clone(), local_service);
2497        }
2498        
2499        debug!("Service discovery refresh completed");
2500        Ok(())
2501    }
2502    
2503    /// Get all known services (local + remote)
2504    pub async fn get_all_services(&self) -> Result<Vec<MCPService>> {
2505        let mut all_services = Vec::new();
2506        
2507        // Add local services
2508        {
2509            let local_services = self.local_services.read().await;
2510            all_services.extend(local_services.values().cloned());
2511        }
2512        
2513        // Add remote services
2514        {
2515            let remote_services = self.remote_services.read().await;
2516            all_services.extend(remote_services.values().cloned());
2517        }
2518        
2519        Ok(all_services)
2520    }
2521    
2522    /// Find services that provide a specific tool
2523    pub async fn find_services_with_tool(&self, tool_name: &str) -> Result<Vec<MCPService>> {
2524        let all_services = self.get_all_services().await?;
2525        
2526        let matching_services = all_services
2527            .into_iter()
2528            .filter(|service| service.tools.contains(&tool_name.to_string()))
2529            .collect();
2530        
2531        Ok(matching_services)
2532    }
2533    
2534    /// Handle service advertisement
2535    pub async fn handle_service_advertisement(&self, message: P2PMCPMessage) -> Result<()> {
2536        debug!("Received service advertisement from peer: {}", message.source_peer);
2537        
2538        // Extract tools from the advertisement
2539        if let MCPMessage::ListToolsResult { tools, .. } = message.payload {
2540            // Create a service record from the advertisement
2541            let service = MCPService {
2542                service_id: format!("mcp-{}", message.source_peer),
2543                node_id: message.source_peer.clone(),
2544                tools: tools.iter().map(|t| t.name.clone()).collect(),
2545                capabilities: MCPCapabilities {
2546                    experimental: None,
2547                    sampling: None,
2548                    tools: Some(MCPToolsCapability {
2549                        list_changed: Some(true),
2550                    }),
2551                    prompts: None,
2552                    resources: None,
2553                    logging: None,
2554                },
2555                metadata: MCPServiceMetadata {
2556                    name: format!("Remote MCP Service - {}", message.source_peer),
2557                    version: "unknown".to_string(),
2558                    description: Some("Remote P2P MCP Service".to_string()),
2559                    tags: vec!["p2p".to_string(), "remote".to_string()],
2560                    health_status: ServiceHealthStatus::Healthy,
2561                    load_metrics: ServiceLoadMetrics {
2562                        active_requests: 0,
2563                        requests_per_second: 0.0,
2564                        avg_response_time_ms: 0.0,
2565                        error_rate: 0.0,
2566                        cpu_usage: 0.0,
2567                        memory_usage: 0,
2568                    },
2569                },
2570                registered_at: SystemTime::now(),
2571                endpoint: MCPEndpoint {
2572                    protocol: "p2p".to_string(),
2573                    address: message.source_peer.clone(),
2574                    port: None,
2575                    tls: true,
2576                    auth_required: false,
2577                },
2578            };
2579            
2580            // Update remote services cache
2581            {
2582                let mut remote_services = self.remote_services.write().await;
2583                remote_services.insert(service.service_id.clone(), service.clone());
2584            }
2585            
2586            // Store in DHT if available
2587            if let Some(dht) = &self.dht {
2588                if let Err(e) = self.store_service_in_dht(&service, dht).await {
2589                    warn!("Failed to store remote service in DHT: {}", e);
2590                }
2591            }
2592            
2593            info!("Registered remote MCP service from {} with {} tools", 
2594                  message.source_peer, tools.len());
2595        }
2596        
2597        Ok(())
2598    }
2599    
2600    /// Handle service discovery request
2601    pub async fn handle_service_discovery(&self, message: P2PMCPMessage) -> Result<Option<Vec<u8>>> {
2602        // Create service advertisement with our local services
2603        let local_services: Vec<MCPService> = {
2604            let services = self.local_services.read().await;
2605            services.values().cloned().collect()
2606        };
2607        
2608        if !local_services.is_empty() {
2609            let advertisement = P2PMCPMessage {
2610                message_type: P2PMCPMessageType::ServiceAdvertisement,
2611                message_id: uuid::Uuid::new_v4().to_string(),
2612                source_peer: "local".to_string(), // TODO: Get actual local peer ID
2613                target_peer: Some(message.source_peer),
2614                timestamp: SystemTime::now()
2615                    .duration_since(std::time::UNIX_EPOCH)
2616                    .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
2617                    .as_secs(),
2618                payload: MCPMessage::ListToolsResult {
2619                    tools: local_services.into_iter()
2620                        .flat_map(|s| s.tools.into_iter().map(|t| MCPTool {
2621                            name: t,
2622                            description: "Remote tool".to_string(),
2623                            input_schema: json!({"type": "object"}),
2624                        }))
2625                        .collect(),
2626                    next_cursor: None,
2627                },
2628                ttl: message.ttl.saturating_sub(1),
2629            };
2630            
2631            let response_data = serde_json::to_vec(&advertisement)
2632                .map_err(|e| P2PError::Serialization(e))?;
2633            
2634            Ok(Some(response_data))
2635        } else {
2636            Ok(None)
2637        }
2638    }
2639    
2640    /// Shutdown the server
2641    pub async fn shutdown(&self) -> Result<()> {
2642        info!("Shutting down MCP server");
2643        
2644        // Close all sessions
2645        {
2646            let mut sessions = self.sessions.write().await;
2647            for session in sessions.values_mut() {
2648                session.state = MCPSessionState::Terminated;
2649            }
2650            sessions.clear();
2651        }
2652        
2653        // TODO: Cleanup tasks and channels
2654        
2655        info!("MCP server shutdown complete");
2656        Ok(())
2657    }
2658}
2659
2660impl Tool {
2661    /// Create a new tool
2662    pub fn new(name: &str, description: &str, input_schema: Value) -> ToolBuilder {
2663        ToolBuilder {
2664            name: name.to_string(),
2665            description: description.to_string(),
2666            input_schema,
2667            handler: None,
2668            tags: Vec::new(),
2669        }
2670    }
2671}
2672
2673/// Builder for creating tools
2674pub struct ToolBuilder {
2675    name: String,
2676    description: String,
2677    input_schema: Value,
2678    handler: Option<Box<dyn ToolHandler + Send + Sync>>,
2679    tags: Vec<String>,
2680}
2681
2682impl ToolBuilder {
2683    /// Set tool handler
2684    pub fn handler<H: ToolHandler + Send + Sync + 'static>(mut self, handler: H) -> Self {
2685        self.handler = Some(Box::new(handler));
2686        self
2687    }
2688    
2689    /// Add tags
2690    pub fn tags(mut self, tags: Vec<String>) -> Self {
2691        self.tags = tags;
2692        self
2693    }
2694    
2695    /// Build the tool
2696    pub fn build(self) -> Result<Tool> {
2697        let handler = self.handler
2698            .ok_or_else(|| P2PError::MCP("Tool handler is required".to_string()))?;
2699        
2700        let definition = MCPTool {
2701            name: self.name,
2702            description: self.description,
2703            input_schema: self.input_schema,
2704        };
2705        
2706        let metadata = ToolMetadata {
2707            created_at: SystemTime::now(),
2708            last_called: None,
2709            call_count: 0,
2710            avg_execution_time: Duration::from_millis(0),
2711            health_status: ToolHealthStatus::Healthy,
2712            tags: self.tags,
2713        };
2714        
2715        Ok(Tool {
2716            definition,
2717            handler,
2718            metadata,
2719        })
2720    }
2721}
2722
2723/// Simple function-based tool handler
2724pub struct FunctionToolHandler<F> {
2725    function: F,
2726}
2727
2728impl<F, Fut> ToolHandler for FunctionToolHandler<F>
2729where
2730    F: Fn(Value) -> Fut + Send + Sync,
2731    Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
2732{
2733    fn execute(&self, arguments: Value) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>> {
2734        Box::pin((self.function)(arguments))
2735    }
2736}
2737
2738impl<F> FunctionToolHandler<F> {
2739    /// Create a new function-based tool handler
2740    pub fn new(function: F) -> Self {
2741        Self { function }
2742    }
2743}
2744
2745/// MCP service descriptor for discovery and routing
2746impl MCPService {
2747    /// Create a new MCP service descriptor
2748    pub fn new(service_id: String, node_id: PeerId) -> Self {
2749        Self {
2750            service_id,
2751            node_id,
2752            tools: Vec::new(),
2753            capabilities: MCPCapabilities {
2754                experimental: None,
2755                sampling: None,
2756                tools: Some(MCPToolsCapability {
2757                    list_changed: Some(true),
2758                }),
2759                prompts: None,
2760                resources: None,
2761                logging: None,
2762            },
2763            metadata: MCPServiceMetadata {
2764                name: "MCP Service".to_string(),
2765                version: "1.0.0".to_string(),
2766                description: None,
2767                tags: Vec::new(),
2768                health_status: ServiceHealthStatus::Healthy,
2769                load_metrics: ServiceLoadMetrics {
2770                    active_requests: 0,
2771                    requests_per_second: 0.0,
2772                    avg_response_time_ms: 0.0,
2773                    error_rate: 0.0,
2774                    cpu_usage: 0.0,
2775                    memory_usage: 0,
2776                },
2777            },
2778            registered_at: SystemTime::now(),
2779            endpoint: MCPEndpoint {
2780                protocol: "p2p".to_string(),
2781                address: "".to_string(),
2782                port: None,
2783                tls: false,
2784                auth_required: false,
2785            },
2786        }
2787    }
2788}
2789
2790impl Default for MCPCapabilities {
2791    fn default() -> Self {
2792        Self {
2793            experimental: None,
2794            sampling: None,
2795            tools: Some(MCPToolsCapability {
2796                list_changed: Some(true),
2797            }),
2798            prompts: Some(MCPPromptsCapability {
2799                list_changed: Some(true),
2800            }),
2801            resources: Some(MCPResourcesCapability {
2802                subscribe: Some(true),
2803                list_changed: Some(true),
2804            }),
2805            logging: Some(MCPLoggingCapability {
2806                levels: Some(vec![
2807                    MCPLogLevel::Debug,
2808                    MCPLogLevel::Info,
2809                    MCPLogLevel::Warning,
2810                    MCPLogLevel::Error,
2811                ]),
2812            }),
2813        }
2814    }
2815}
2816
2817#[cfg(test)]
2818mod tests {
2819    use super::*;
2820    use crate::dht::{DHT, DHTConfig, Key};
2821    use std::pin::Pin;
2822    use std::future::Future;
2823    use tokio::time::timeout;
2824
2825    /// Test implementation of ToolHandler for unit tests
2826    struct TestTool {
2827        name: String,
2828        should_error: bool,
2829        execution_time: Duration,
2830    }
2831
2832    impl TestTool {
2833        fn new(name: &str) -> Self {
2834            Self {
2835                name: name.to_string(),
2836                should_error: false,
2837                execution_time: Duration::from_millis(10),
2838            }
2839        }
2840
2841        fn with_error(mut self) -> Self {
2842            self.should_error = true;
2843            self
2844        }
2845
2846        fn with_execution_time(mut self, duration: Duration) -> Self {
2847            self.execution_time = duration;
2848            self
2849        }
2850    }
2851
2852    impl ToolHandler for TestTool {
2853        fn execute(&self, arguments: Value) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>> {
2854            let should_error = self.should_error;
2855            let execution_time = self.execution_time;
2856            let name = self.name.clone();
2857
2858            Box::pin(async move {
2859                tokio::time::sleep(execution_time).await;
2860
2861                if should_error {
2862                    return Err(P2PError::MCP(format!("Test error from tool {}", name)).into());
2863                }
2864
2865                // Echo back the arguments with a response marker
2866                Ok(json!({
2867                    "tool": name,
2868                    "arguments": arguments,
2869                    "result": "success"
2870                }))
2871            })
2872        }
2873
2874        fn validate(&self, arguments: &Value) -> Result<()> {
2875            if !arguments.is_object() {
2876                return Err(P2PError::MCP("Arguments must be an object".to_string()).into());
2877            }
2878            Ok(())
2879        }
2880
2881        fn get_requirements(&self) -> ToolRequirements {
2882            ToolRequirements {
2883                max_memory: Some(1024 * 1024), // 1MB
2884                max_execution_time: Some(Duration::from_secs(5)),
2885                required_capabilities: vec!["test".to_string()],
2886                requires_network: false,
2887                requires_filesystem: false,
2888            }
2889        }
2890    }
2891
2892    /// Helper function to create a test MCP server
2893    async fn create_test_mcp_server() -> MCPServer {
2894        let config = MCPServerConfig {
2895            server_name: "test_server".to_string(),
2896            server_version: "1.0.0".to_string(),
2897            enable_auth: false,
2898            enable_rate_limiting: false,
2899            max_concurrent_requests: 10,
2900            request_timeout: Duration::from_secs(30),
2901            enable_dht_discovery: true,
2902            rate_limit_rpm: 60,
2903            enable_logging: true,
2904            max_tool_execution_time: Duration::from_secs(30),
2905            tool_memory_limit: 100 * 1024 * 1024,
2906            health_monitor: HealthMonitorConfig::default(),
2907        };
2908
2909        MCPServer::new(config)
2910    }
2911
2912    /// Helper function to create a test tool
2913    fn create_test_tool(name: &str) -> Tool {
2914        Tool {
2915            definition: MCPTool {
2916                name: name.to_string(),
2917                description: format!("Test tool: {}", name),
2918                input_schema: json!({
2919                    "type": "object",
2920                    "properties": {
2921                        "input": { "type": "string" }
2922                    }
2923                }),
2924            },
2925            handler: Box::new(TestTool::new(name)),
2926            metadata: ToolMetadata {
2927                created_at: SystemTime::now(),
2928                last_called: None,
2929                call_count: 0,
2930                avg_execution_time: Duration::from_millis(0),
2931                health_status: ToolHealthStatus::Healthy,
2932                tags: vec!["test".to_string()],
2933            },
2934        }
2935    }
2936
2937    /// Helper function to create a test DHT
2938    async fn create_test_dht() -> DHT {
2939        let local_id = Key::new(b"test_node_id");
2940        let config = DHTConfig::default();
2941        DHT::new(local_id, config)
2942    }
2943
2944    /// Helper function to create an MCP call context
2945    fn create_test_context(caller_id: PeerId) -> MCPCallContext {
2946        MCPCallContext {
2947            caller_id,
2948            timestamp: SystemTime::now(),
2949            timeout: Duration::from_secs(30),
2950            auth_info: None,
2951            metadata: HashMap::new(),
2952        }
2953    }
2954
2955    #[tokio::test]
2956    async fn test_mcp_server_creation() {
2957        let server = create_test_mcp_server().await;
2958        assert_eq!(server.config.server_name, "test_server");
2959        assert_eq!(server.config.server_version, "1.0.0");
2960        assert!(!server.config.enable_auth);
2961        assert!(!server.config.enable_rate_limiting);
2962    }
2963
2964    #[tokio::test]
2965    async fn test_tool_registration() -> Result<()> {
2966        let server = create_test_mcp_server().await;
2967        let tool = create_test_tool("test_calculator");
2968
2969        // Register the tool
2970        server.register_tool(tool).await?;
2971
2972        // Verify tool is registered
2973        let tools = server.tools.read().await;
2974        assert!(tools.contains_key("test_calculator"));
2975        assert_eq!(tools.get("test_calculator").unwrap().definition.name, "test_calculator");
2976
2977        // Verify stats updated
2978        let stats = server.stats.read().await;
2979        assert_eq!(stats.total_tools, 1);
2980
2981        Ok(())
2982    }
2983
2984    #[tokio::test]
2985    async fn test_tool_registration_duplicate() -> Result<()> {
2986        let server = create_test_mcp_server().await;
2987        let tool1 = create_test_tool("duplicate_tool");
2988        let tool2 = create_test_tool("duplicate_tool");
2989
2990        // Register first tool
2991        server.register_tool(tool1).await?;
2992
2993        // Try to register duplicate - should fail
2994        let result = server.register_tool(tool2).await;
2995        assert!(result.is_err());
2996        assert!(result.unwrap_err().to_string().contains("Tool already exists"));
2997
2998        Ok(())
2999    }
3000
3001    #[tokio::test]
3002    async fn test_tool_validation() {
3003        let server = create_test_mcp_server().await;
3004
3005        // Test invalid tool name (empty)
3006        let mut invalid_tool = create_test_tool("");
3007        let result = server.validate_tool(&invalid_tool).await;
3008        assert!(result.is_err());
3009
3010        // Test invalid tool name (too long)
3011        invalid_tool.definition.name = "a".repeat(200);
3012        let result = server.validate_tool(&invalid_tool).await;
3013        assert!(result.is_err());
3014
3015        // Test invalid schema (not an object)
3016        let mut invalid_schema_tool = create_test_tool("valid_name");
3017        invalid_schema_tool.definition.input_schema = json!("not an object");
3018        let result = server.validate_tool(&invalid_schema_tool).await;
3019        assert!(result.is_err());
3020
3021        // Test valid tool
3022        let valid_tool = create_test_tool("valid_tool");
3023        let result = server.validate_tool(&valid_tool).await;
3024        assert!(result.is_ok());
3025    }
3026
3027    #[tokio::test]
3028    async fn test_tool_call_success() -> Result<()> {
3029        let server = create_test_mcp_server().await;
3030        let tool = create_test_tool("success_tool");
3031        server.register_tool(tool).await?;
3032
3033        let caller_id = "test_peer_123".to_string();
3034        let context = create_test_context(caller_id);
3035        let arguments = json!({"input": "test data"});
3036
3037        let result = server.call_tool("success_tool", arguments.clone(), context).await?;
3038
3039        // Verify response structure
3040        assert_eq!(result["tool"], "success_tool");
3041        assert_eq!(result["arguments"], arguments);
3042        assert_eq!(result["result"], "success");
3043
3044        // Verify tool metadata updated
3045        let tools = server.tools.read().await;
3046        let tool_metadata = &tools.get("success_tool").unwrap().metadata;
3047        assert_eq!(tool_metadata.call_count, 1);
3048        assert!(tool_metadata.last_called.is_some());
3049
3050        Ok(())
3051    }
3052
3053    #[tokio::test]
3054    async fn test_tool_call_nonexistent() -> Result<()> {
3055        let server = create_test_mcp_server().await;
3056        let caller_id = "test_peer_456".to_string();
3057        let context = create_test_context(caller_id);
3058        let arguments = json!({"input": "test"});
3059
3060        let result = server.call_tool("nonexistent_tool", arguments, context).await;
3061        assert!(result.is_err());
3062        assert!(result.unwrap_err().to_string().contains("Tool not found"));
3063
3064        Ok(())
3065    }
3066
3067    #[tokio::test]
3068    async fn test_tool_call_handler_error() -> Result<()> {
3069        let server = create_test_mcp_server().await;
3070        let tool = Tool {
3071            definition: MCPTool {
3072                name: "error_tool".to_string(),
3073                description: "Tool that always errors".to_string(),
3074                input_schema: json!({"type": "object"}),
3075            },
3076            handler: Box::new(TestTool::new("error_tool").with_error()),
3077            metadata: ToolMetadata {
3078                created_at: SystemTime::now(),
3079                last_called: None,
3080                call_count: 0,
3081                avg_execution_time: Duration::from_millis(0),
3082                health_status: ToolHealthStatus::Healthy,
3083                tags: vec![],
3084            },
3085        };
3086
3087        server.register_tool(tool).await?;
3088
3089        let caller_id = "test_peer_error".to_string();
3090        let context = create_test_context(caller_id);
3091        let arguments = json!({"input": "test"});
3092
3093        let result = server.call_tool("error_tool", arguments, context).await;
3094        assert!(result.is_err());
3095        assert!(result.unwrap_err().to_string().contains("Test error from tool error_tool"));
3096
3097        Ok(())
3098    }
3099
3100    #[tokio::test]
3101    async fn test_tool_call_timeout() -> Result<()> {
3102        let server = create_test_mcp_server().await;
3103        let slow_tool = Tool {
3104            definition: MCPTool {
3105                name: "slow_tool".to_string(),
3106                description: "Tool that takes too long".to_string(),
3107                input_schema: json!({"type": "object"}),
3108            },
3109            handler: Box::new(TestTool::new("slow_tool").with_execution_time(Duration::from_secs(2))),
3110            metadata: ToolMetadata {
3111                created_at: SystemTime::now(),
3112                last_called: None,
3113                call_count: 0,
3114                avg_execution_time: Duration::from_millis(0),
3115                health_status: ToolHealthStatus::Healthy,
3116                tags: vec![],
3117            },
3118        };
3119
3120        server.register_tool(slow_tool).await?;
3121
3122        let caller_id = "test_peer_error".to_string();
3123        let context = create_test_context(caller_id);
3124        let arguments = json!({"input": "test"});
3125
3126        // Test with very short timeout
3127        let result = timeout(
3128            Duration::from_millis(100),
3129            server.call_tool("slow_tool", arguments, context)
3130        ).await;
3131
3132        assert!(result.is_err()); // Should timeout
3133
3134        Ok(())
3135    }
3136
3137    #[tokio::test]
3138    async fn test_tool_requirements() {
3139        let tool = TestTool::new("req_tool");
3140        let requirements = tool.get_requirements();
3141
3142        assert_eq!(requirements.max_memory, Some(1024 * 1024));
3143        assert_eq!(requirements.max_execution_time, Some(Duration::from_secs(5)));
3144        assert_eq!(requirements.required_capabilities, vec!["test"]);
3145        assert!(!requirements.requires_network);
3146        assert!(!requirements.requires_filesystem);
3147    }
3148
3149    #[tokio::test]
3150    async fn test_tool_validation_handler() {
3151        let tool = TestTool::new("validation_tool");
3152
3153        // Valid arguments (object)
3154        let valid_args = json!({"key": "value"});
3155        assert!(tool.validate(&valid_args).is_ok());
3156
3157        // Invalid arguments (not an object)
3158        let invalid_args = json!("not an object");
3159        assert!(tool.validate(&invalid_args).is_err());
3160
3161        let invalid_args = json!(123);
3162        assert!(tool.validate(&invalid_args).is_err());
3163    }
3164
3165    #[tokio::test]
3166    async fn test_tool_health_status() {
3167        let mut metadata = ToolMetadata {
3168            created_at: SystemTime::now(),
3169            last_called: None,
3170            call_count: 0,
3171            avg_execution_time: Duration::from_millis(0),
3172            health_status: ToolHealthStatus::Healthy,
3173            tags: vec![],
3174        };
3175
3176        // Test different health statuses
3177        assert_eq!(metadata.health_status, ToolHealthStatus::Healthy);
3178
3179        metadata.health_status = ToolHealthStatus::Degraded;
3180        assert_eq!(metadata.health_status, ToolHealthStatus::Degraded);
3181
3182        metadata.health_status = ToolHealthStatus::Unhealthy;
3183        assert_eq!(metadata.health_status, ToolHealthStatus::Unhealthy);
3184
3185        metadata.health_status = ToolHealthStatus::Disabled;
3186        assert_eq!(metadata.health_status, ToolHealthStatus::Disabled);
3187    }
3188
3189    #[tokio::test]
3190    async fn test_mcp_capabilities() {
3191        let server = create_test_mcp_server().await;
3192        let capabilities = server.get_server_capabilities().await;
3193
3194        assert!(capabilities.tools.is_some());
3195        assert!(capabilities.prompts.is_some());
3196        assert!(capabilities.resources.is_some());
3197        assert!(capabilities.logging.is_some());
3198
3199        let tools_cap = capabilities.tools.unwrap();
3200        assert_eq!(tools_cap.list_changed, Some(true));
3201
3202        let logging_cap = capabilities.logging.unwrap();
3203        let levels = logging_cap.levels.unwrap();
3204        assert!(levels.contains(&MCPLogLevel::Debug));
3205        assert!(levels.contains(&MCPLogLevel::Info));
3206        assert!(levels.contains(&MCPLogLevel::Warning));
3207        assert!(levels.contains(&MCPLogLevel::Error));
3208    }
3209
3210    #[tokio::test]
3211    async fn test_mcp_message_serialization() {
3212        // Test Initialize message
3213        let init_msg = MCPMessage::Initialize {
3214            protocol_version: MCP_VERSION.to_string(),
3215            capabilities: MCPCapabilities {
3216                experimental: None,
3217                sampling: None,
3218                tools: Some(MCPToolsCapability { list_changed: Some(true) }),
3219                prompts: None,
3220                resources: None,
3221                logging: None,
3222            },
3223            client_info: MCPClientInfo {
3224                name: "test_client".to_string(),
3225                version: "1.0.0".to_string(),
3226            },
3227        };
3228
3229        let serialized = serde_json::to_string(&init_msg).unwrap();
3230        let deserialized: MCPMessage = serde_json::from_str(&serialized).unwrap();
3231
3232        match deserialized {
3233            MCPMessage::Initialize { protocol_version, client_info, .. } => {
3234                assert_eq!(protocol_version, MCP_VERSION);
3235                assert_eq!(client_info.name, "test_client");
3236                assert_eq!(client_info.version, "1.0.0");
3237            }
3238            _ => panic!("Wrong message type after deserialization"),
3239        }
3240    }
3241
3242    #[tokio::test]
3243    async fn test_mcp_content_types() {
3244        // Test text content
3245        let text_content = MCPContent::Text {
3246            text: "Hello, world!".to_string(),
3247        };
3248
3249        let serialized = serde_json::to_string(&text_content).unwrap();
3250        let deserialized: MCPContent = serde_json::from_str(&serialized).unwrap();
3251
3252        match deserialized {
3253            MCPContent::Text { text } => assert_eq!(text, "Hello, world!"),
3254            _ => panic!("Wrong content type"),
3255        }
3256
3257        // Test image content
3258        let image_content = MCPContent::Image {
3259            data: "base64data".to_string(),
3260            mime_type: "image/png".to_string(),
3261        };
3262
3263        let serialized = serde_json::to_string(&image_content).unwrap();
3264        let deserialized: MCPContent = serde_json::from_str(&serialized).unwrap();
3265
3266        match deserialized {
3267            MCPContent::Image { data, mime_type } => {
3268                assert_eq!(data, "base64data");
3269                assert_eq!(mime_type, "image/png");
3270            }
3271            _ => panic!("Wrong content type"),
3272        }
3273    }
3274
3275    #[tokio::test]
3276    async fn test_service_health_status() {
3277        let mut metrics = ServiceLoadMetrics {
3278            active_requests: 0,
3279            requests_per_second: 0.0,
3280            avg_response_time_ms: 0.0,
3281            error_rate: 0.0,
3282            cpu_usage: 0.0,
3283            memory_usage: 0,
3284        };
3285
3286        // Test healthy service
3287        let metadata = MCPServiceMetadata {
3288            name: "test_service".to_string(),
3289            version: "1.0.0".to_string(),
3290            description: Some("Test service".to_string()),
3291            tags: vec!["test".to_string()],
3292            health_status: ServiceHealthStatus::Healthy,
3293            load_metrics: metrics.clone(),
3294        };
3295
3296        assert_eq!(metadata.health_status, ServiceHealthStatus::Healthy);
3297
3298        // Test different health statuses
3299        metrics.error_rate = 0.5; // 50% error rate
3300        let degraded_metadata = MCPServiceMetadata {
3301            health_status: ServiceHealthStatus::Degraded,
3302            load_metrics: metrics.clone(),
3303            ..metadata.clone()
3304        };
3305
3306        assert_eq!(degraded_metadata.health_status, ServiceHealthStatus::Degraded);
3307
3308        let unhealthy_metadata = MCPServiceMetadata {
3309            health_status: ServiceHealthStatus::Unhealthy,
3310            ..metadata.clone()
3311        };
3312
3313        assert_eq!(unhealthy_metadata.health_status, ServiceHealthStatus::Unhealthy);
3314    }
3315
3316    #[tokio::test]
3317    async fn test_p2p_mcp_message() {
3318        let source_peer = "source_peer_123".to_string();
3319        let target_peer = "target_peer_456".to_string();
3320
3321        let p2p_message = P2PMCPMessage {
3322            message_type: P2PMCPMessageType::Request,
3323            message_id: uuid::Uuid::new_v4().to_string(),
3324            source_peer: source_peer.clone(),
3325            target_peer: Some(target_peer.clone()),
3326            timestamp: SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(),
3327            payload: MCPMessage::ListTools { cursor: None },
3328            ttl: 10,
3329        };
3330
3331        // Test serialization
3332        let serialized = serde_json::to_string(&p2p_message).unwrap();
3333        let deserialized: P2PMCPMessage = serde_json::from_str(&serialized).unwrap();
3334
3335        assert_eq!(deserialized.message_type, P2PMCPMessageType::Request);
3336        assert_eq!(deserialized.source_peer, source_peer);
3337        assert_eq!(deserialized.target_peer, Some(target_peer));
3338        assert_eq!(deserialized.ttl, 10);
3339
3340        match deserialized.payload {
3341            MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
3342            _ => panic!("Wrong message payload type"),
3343        }
3344    }
3345
3346    #[tokio::test]
3347    async fn test_tool_requirements_default() {
3348        let default_requirements = ToolRequirements::default();
3349
3350        assert_eq!(default_requirements.max_memory, Some(100 * 1024 * 1024));
3351        assert_eq!(default_requirements.max_execution_time, Some(Duration::from_secs(30)));
3352        assert!(default_requirements.required_capabilities.is_empty());
3353        assert!(!default_requirements.requires_network);
3354        assert!(!default_requirements.requires_filesystem);
3355    }
3356
3357    #[tokio::test]
3358    async fn test_mcp_server_stats() {
3359        let server = create_test_mcp_server().await;
3360
3361        // Initial stats should be zero
3362        let stats = server.stats.read().await;
3363        assert_eq!(stats.total_tools, 0);
3364        assert_eq!(stats.total_requests, 0);
3365        assert_eq!(stats.total_responses, 0);
3366        assert_eq!(stats.total_errors, 0);
3367
3368        drop(stats);
3369
3370        // Register a tool and verify stats update
3371        let tool = create_test_tool("stats_test_tool");
3372        server.register_tool(tool).await.unwrap();
3373
3374        let stats = server.stats.read().await;
3375        assert_eq!(stats.total_tools, 1);
3376    }
3377
3378    #[tokio::test]
3379    async fn test_log_levels() {
3380        // Test all log levels serialize/deserialize correctly
3381        let levels = vec![
3382            MCPLogLevel::Debug,
3383            MCPLogLevel::Info,
3384            MCPLogLevel::Notice,
3385            MCPLogLevel::Warning,
3386            MCPLogLevel::Error,
3387            MCPLogLevel::Critical,
3388            MCPLogLevel::Alert,
3389            MCPLogLevel::Emergency,
3390        ];
3391
3392        for level in levels {
3393            let serialized = serde_json::to_string(&level).unwrap();
3394            let deserialized: MCPLogLevel = serde_json::from_str(&serialized).unwrap();
3395            assert_eq!(level as u8, deserialized as u8);
3396        }
3397    }
3398
3399    #[tokio::test]
3400    async fn test_mcp_endpoint() {
3401        let endpoint = MCPEndpoint {
3402            protocol: "p2p".to_string(),
3403            address: "127.0.0.1".to_string(),
3404            port: Some(9000),
3405            tls: true,
3406            auth_required: true,
3407        };
3408
3409        let serialized = serde_json::to_string(&endpoint).unwrap();
3410        let deserialized: MCPEndpoint = serde_json::from_str(&serialized).unwrap();
3411
3412        assert_eq!(deserialized.protocol, "p2p");
3413        assert_eq!(deserialized.address, "127.0.0.1");
3414        assert_eq!(deserialized.port, Some(9000));
3415        assert!(deserialized.tls);
3416        assert!(deserialized.auth_required);
3417    }
3418
3419    #[tokio::test]
3420    async fn test_mcp_service_metadata() {
3421        let load_metrics = ServiceLoadMetrics {
3422            active_requests: 5,
3423            requests_per_second: 10.5,
3424            avg_response_time_ms: 250.0,
3425            error_rate: 0.01,
3426            cpu_usage: 45.5,
3427            memory_usage: 1024 * 1024 * 100, // 100MB
3428        };
3429
3430        let metadata = MCPServiceMetadata {
3431            name: "test_service".to_string(),
3432            version: "2.1.0".to_string(),
3433            description: Some("A test service for unit testing".to_string()),
3434            tags: vec!["test".to_string(), "unit".to_string(), "mcp".to_string()],
3435            health_status: ServiceHealthStatus::Healthy,
3436            load_metrics,
3437        };
3438
3439        // Test serialization
3440        let serialized = serde_json::to_string(&metadata).unwrap();
3441        let deserialized: MCPServiceMetadata = serde_json::from_str(&serialized).unwrap();
3442
3443        assert_eq!(deserialized.name, "test_service");
3444        assert_eq!(deserialized.version, "2.1.0");
3445        assert_eq!(deserialized.description, Some("A test service for unit testing".to_string()));
3446        assert_eq!(deserialized.tags, vec!["test", "unit", "mcp"]);
3447        assert_eq!(deserialized.health_status, ServiceHealthStatus::Healthy);
3448        assert_eq!(deserialized.load_metrics.active_requests, 5);
3449        assert_eq!(deserialized.load_metrics.requests_per_second, 10.5);
3450    }
3451
3452    #[tokio::test]
3453    async fn test_function_tool_handler() {
3454        // Test function tool handler creation and execution
3455        let handler = FunctionToolHandler::new(|args: Value| async move {
3456            let name = args.get("name").and_then(|v| v.as_str()).unwrap_or("world");
3457            Ok(json!({"greeting": format!("Hello, {}!", name)}))
3458        });
3459
3460        let args = json!({"name": "Alice"});
3461        let result = handler.execute(args).await.unwrap();
3462        assert_eq!(result["greeting"], "Hello, Alice!");
3463
3464        // Test with missing argument
3465        let empty_args = json!({});
3466        let result = handler.execute(empty_args).await.unwrap();
3467        assert_eq!(result["greeting"], "Hello, world!");
3468    }
3469
3470    #[tokio::test]
3471    async fn test_mcp_service_creation() {
3472        let service_id = "test_service_123".to_string();
3473        let node_id = "test_node_789".to_string();
3474
3475        let service = MCPService::new(service_id.clone(), node_id.clone());
3476
3477        assert_eq!(service.service_id, service_id);
3478        assert_eq!(service.node_id, node_id);
3479        assert!(service.tools.is_empty());
3480        assert_eq!(service.metadata.name, "MCP Service");
3481        assert_eq!(service.metadata.version, "1.0.0");
3482        assert_eq!(service.metadata.health_status, ServiceHealthStatus::Healthy);
3483        assert_eq!(service.endpoint.protocol, "p2p");
3484        assert!(!service.endpoint.tls);
3485        assert!(!service.endpoint.auth_required);
3486    }
3487
3488    #[tokio::test]
3489    async fn test_mcp_capabilities_default() {
3490        let capabilities = MCPCapabilities::default();
3491
3492        assert!(capabilities.tools.is_some());
3493        assert!(capabilities.prompts.is_some());
3494        assert!(capabilities.resources.is_some());
3495        assert!(capabilities.logging.is_some());
3496
3497        let tools_cap = capabilities.tools.unwrap();
3498        assert_eq!(tools_cap.list_changed, Some(true));
3499
3500        let resources_cap = capabilities.resources.unwrap();
3501        assert_eq!(resources_cap.subscribe, Some(true));
3502        assert_eq!(resources_cap.list_changed, Some(true));
3503
3504        let logging_cap = capabilities.logging.unwrap();
3505        let levels = logging_cap.levels.unwrap();
3506        assert!(levels.contains(&MCPLogLevel::Debug));
3507        assert!(levels.contains(&MCPLogLevel::Info));
3508        assert!(levels.contains(&MCPLogLevel::Warning));
3509        assert!(levels.contains(&MCPLogLevel::Error));
3510    }
3511
3512    #[tokio::test]
3513    async fn test_mcp_request_creation() {
3514        let source_peer = "source_peer_123".to_string();
3515        let target_peer = "target_peer_456".to_string();
3516
3517        let request = MCPRequest {
3518            request_id: uuid::Uuid::new_v4().to_string(),
3519            source_peer: source_peer.clone(),
3520            target_peer: target_peer.clone(),
3521            message: MCPMessage::ListTools { cursor: None },
3522            timestamp: SystemTime::now(),
3523            timeout: Duration::from_secs(30),
3524            auth_token: Some("test_token".to_string()),
3525        };
3526
3527        assert_eq!(request.source_peer, source_peer);
3528        assert_eq!(request.target_peer, target_peer);
3529        assert_eq!(request.timeout, Duration::from_secs(30));
3530        assert_eq!(request.auth_token, Some("test_token".to_string()));
3531
3532        match request.message {
3533            MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
3534            _ => panic!("Wrong message type"),
3535        }
3536    }
3537
3538    #[tokio::test]
3539    async fn test_p2p_message_types() {
3540        // Test all P2P message types
3541        assert_eq!(P2PMCPMessageType::Request, P2PMCPMessageType::Request);
3542        assert_eq!(P2PMCPMessageType::Response, P2PMCPMessageType::Response);
3543        assert_eq!(P2PMCPMessageType::ServiceAdvertisement, P2PMCPMessageType::ServiceAdvertisement);
3544        assert_eq!(P2PMCPMessageType::ServiceDiscovery, P2PMCPMessageType::ServiceDiscovery);
3545
3546        // Test serialization of each type
3547        for msg_type in [
3548            P2PMCPMessageType::Request,
3549            P2PMCPMessageType::Response,
3550            P2PMCPMessageType::ServiceAdvertisement,
3551            P2PMCPMessageType::ServiceDiscovery,
3552        ] {
3553            let serialized = serde_json::to_string(&msg_type).unwrap();
3554            let deserialized: P2PMCPMessageType = serde_json::from_str(&serialized).unwrap();
3555            assert_eq!(msg_type, deserialized);
3556        }
3557    }
3558}