saorsa_core/
mcp.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Model Context Protocol (MCP) Server Implementation
15//!
16//! This module provides a fully-featured MCP server that integrates with the P2P network,
17//! enabling AI agents to discover and call tools across the distributed network.
18//!
19//! The implementation includes:
20//! - MCP message routing over P2P transport
21//! - Tool registration and discovery through DHT
22//! - Security and authentication for remote calls
23//! - Service health monitoring and load balancing
24
25#![allow(missing_docs)]
26
27pub mod security;
28
29use crate::dht::{DHT, Key};
30use crate::{P2PError, PeerId, Result};
31use rand;
32use serde::{Deserialize, Serialize};
33use serde_json::{Value, json};
34use std::collections::HashMap;
35use std::sync::Arc;
36use std::time::{Duration, Instant, SystemTime};
37use tokio::sync::{RwLock, mpsc, oneshot};
38use tokio::time::timeout;
39use tracing::{debug, info, warn};
40
41pub use security::*;
42
43/// MCP protocol version
44pub const MCP_VERSION: &str = "2024-11-05";
45
46/// Maximum message size for MCP calls (1MB)
47pub const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
48
49/// Default timeout for MCP calls
50pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30);
51
52/// MCP protocol identifier for P2P messaging
53pub const MCP_PROTOCOL: &str = "/p2p-foundation/mcp/1.0.0";
54
55/// Network message sender trait for MCP server
56#[async_trait::async_trait]
57pub trait NetworkSender: Send + Sync {
58    /// Send a message to a specific peer via the P2P network
59    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
60
61    /// Get our local peer ID
62    fn local_peer_id(&self) -> &PeerId;
63}
64
65/// Message sender function type for simplified network integration
66pub type MessageSender = Arc<dyn Fn(&PeerId, &str, Vec<u8>) -> Result<()> + Send + Sync>;
67
68/// Service discovery refresh interval
69pub const SERVICE_DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
70
71/// MCP message types
72#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(tag = "type", rename_all = "snake_case")]
74pub enum MCPMessage {
75    /// Initialize MCP session
76    Initialize {
77        /// MCP protocol version being used
78        protocol_version: String,
79        /// Client capabilities for this session
80        capabilities: MCPCapabilities,
81        /// Information about the connecting client
82        client_info: MCPClientInfo,
83    },
84    /// Initialize response
85    InitializeResult {
86        /// MCP protocol version the server supports
87        protocol_version: String,
88        /// Server capabilities for this session
89        capabilities: MCPCapabilities,
90        /// Information about the MCP server
91        server_info: MCPServerInfo,
92    },
93    /// List available tools
94    ListTools {
95        /// Pagination cursor for large tool lists
96        cursor: Option<String>,
97    },
98    /// List tools response
99    ListToolsResult {
100        /// Available tools on this server
101        tools: Vec<MCPTool>,
102        /// Next pagination cursor if more tools available
103        next_cursor: Option<String>,
104    },
105    /// Call a tool
106    CallTool {
107        /// Name of the tool to call
108        name: String,
109        /// Arguments to pass to the tool
110        arguments: Value,
111    },
112    /// Tool call response
113    CallToolResult {
114        /// Content returned by the tool
115        content: Vec<MCPContent>,
116        /// Whether the call resulted in an error
117        is_error: bool,
118    },
119    /// List available prompts
120    ListPrompts {
121        /// Pagination cursor for large prompt lists
122        cursor: Option<String>,
123    },
124    /// List prompts response
125    ListPromptsResult {
126        /// Available prompts on this server
127        prompts: Vec<MCPPrompt>,
128        /// Next pagination cursor if more prompts available
129        next_cursor: Option<String>,
130    },
131    /// Get a prompt
132    GetPrompt {
133        /// Name of the prompt to retrieve
134        name: String,
135        /// Arguments to customize the prompt
136        arguments: Option<Value>,
137    },
138    /// Get prompt response
139    GetPromptResult {
140        /// Description of the prompt
141        description: Option<String>,
142        /// Prompt messages/content
143        messages: Vec<MCPPromptMessage>,
144    },
145    /// List available resources
146    ListResources {
147        /// Pagination cursor for large resource lists
148        cursor: Option<String>,
149    },
150    /// List resources response
151    ListResourcesResult {
152        /// Available resources on this server
153        resources: Vec<MCPResource>,
154        /// Next pagination cursor if more resources available
155        next_cursor: Option<String>,
156    },
157    /// Read a resource
158    ReadResource {
159        /// URI of the resource to read
160        uri: String,
161    },
162    /// Read resource response
163    ReadResourceResult {
164        /// Contents of the requested resource
165        contents: Vec<MCPResourceContent>,
166    },
167    /// Subscribe to resource
168    SubscribeResource {
169        /// URI of the resource to subscribe to
170        uri: String,
171    },
172    /// Unsubscribe from resource
173    UnsubscribeResource {
174        /// URI of the resource to unsubscribe from
175        uri: String,
176    },
177    /// Resource updated notification
178    ResourceUpdated {
179        /// URI of the resource that was updated
180        uri: String,
181    },
182    /// List logs
183    ListLogs {
184        /// Pagination cursor for large log lists
185        cursor: Option<String>,
186    },
187    /// List logs response
188    ListLogsResult {
189        /// Log entries available on this server
190        logs: Vec<MCPLogEntry>,
191        /// Next pagination cursor if more logs available
192        next_cursor: Option<String>,
193    },
194    /// Set log level
195    SetLogLevel {
196        /// Log level to set for the server
197        level: MCPLogLevel,
198    },
199    /// Error response
200    Error {
201        /// Error code identifying the type of error
202        code: i32,
203        /// Human-readable error message
204        message: String,
205        /// Optional additional error data
206        data: Option<Value>,
207    },
208}
209
210/// MCP capabilities
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct MCPCapabilities {
213    /// Experimental capabilities
214    pub experimental: Option<Value>,
215    /// Sampling capability
216    pub sampling: Option<Value>,
217    /// Tools capability
218    pub tools: Option<MCPToolsCapability>,
219    /// Prompts capability
220    pub prompts: Option<MCPPromptsCapability>,
221    /// Resources capability
222    pub resources: Option<MCPResourcesCapability>,
223    /// Logging capability
224    pub logging: Option<MCPLoggingCapability>,
225}
226
227/// Tools capability configuration
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct MCPToolsCapability {
230    /// Whether tools are supported
231    pub list_changed: Option<bool>,
232}
233
234/// Prompts capability configuration
235#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct MCPPromptsCapability {
237    /// Whether prompts are supported
238    pub list_changed: Option<bool>,
239}
240
241/// Resources capability configuration
242#[derive(Debug, Clone, Serialize, Deserialize)]
243pub struct MCPResourcesCapability {
244    /// Whether resources are supported
245    pub subscribe: Option<bool>,
246    /// Whether resource listing is supported
247    pub list_changed: Option<bool>,
248}
249
250/// Logging capability configuration
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct MCPLoggingCapability {
253    /// Available log levels
254    pub levels: Option<Vec<MCPLogLevel>>,
255}
256
257/// MCP client information
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct MCPClientInfo {
260    /// Client name
261    pub name: String,
262    /// Client version
263    pub version: String,
264}
265
266/// MCP server information
267#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct MCPServerInfo {
269    /// Server name
270    pub name: String,
271    /// Server version
272    pub version: String,
273}
274
275/// MCP tool definition
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct MCPTool {
278    /// Tool name
279    pub name: String,
280    /// Tool description
281    pub description: String,
282    /// Input schema (JSON Schema)
283    pub input_schema: Value,
284}
285
286/// MCP tool implementation
287pub struct Tool {
288    /// Tool definition
289    pub definition: MCPTool,
290    /// Tool handler function
291    pub handler: Box<dyn ToolHandler + Send + Sync>,
292    /// Tool metadata
293    pub metadata: ToolMetadata,
294}
295
296/// Tool metadata for tracking and optimization
297#[derive(Debug, Clone)]
298pub struct ToolMetadata {
299    /// Tool creation time
300    pub created_at: SystemTime,
301    /// Last call time
302    pub last_called: Option<SystemTime>,
303    /// Total number of calls
304    pub call_count: u64,
305    /// Average execution time
306    pub avg_execution_time: Duration,
307    /// Tool health status
308    pub health_status: ToolHealthStatus,
309    /// Tags for categorization
310    pub tags: Vec<String>,
311}
312
313/// Tool health status
314#[derive(Debug, Clone, Copy, PartialEq)]
315pub enum ToolHealthStatus {
316    /// Tool is healthy and responsive
317    Healthy,
318    /// Tool is experiencing issues
319    Degraded,
320    /// Tool is not responding
321    Unhealthy,
322    /// Tool is disabled
323    Disabled,
324}
325
326/// Health monitoring configuration
327#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct HealthMonitorConfig {
329    /// Interval between health checks
330    pub health_check_interval: Duration,
331    /// Timeout for health check requests
332    pub health_check_timeout: Duration,
333    /// Number of consecutive failures before marking unhealthy
334    pub failure_threshold: u32,
335    /// Number of consecutive successes to mark healthy again
336    pub success_threshold: u32,
337    /// Interval for sending heartbeats
338    pub heartbeat_interval: Duration,
339    /// Maximum age of last heartbeat before considering service stale
340    pub heartbeat_timeout: Duration,
341    /// Whether to enable health monitoring
342    pub enabled: bool,
343}
344
345impl Default for HealthMonitorConfig {
346    fn default() -> Self {
347        Self {
348            health_check_interval: Duration::from_secs(30),
349            health_check_timeout: Duration::from_secs(5),
350            failure_threshold: 3,
351            success_threshold: 2,
352            heartbeat_interval: Duration::from_secs(60),
353            heartbeat_timeout: Duration::from_secs(300), // 5 minutes
354            enabled: true,
355        }
356    }
357}
358
359/// Service health information
360#[derive(Debug, Clone, Serialize, Deserialize)]
361pub struct ServiceHealth {
362    /// Service identifier
363    pub service_id: String,
364    /// Current health status
365    pub status: ServiceHealthStatus,
366    /// Last successful health check
367    pub last_health_check: Option<SystemTime>,
368    /// Last heartbeat received
369    pub last_heartbeat: Option<SystemTime>,
370    /// Consecutive failure count
371    pub failure_count: u32,
372    /// Consecutive success count
373    pub success_count: u32,
374    /// Average response time for health checks
375    pub avg_response_time: Duration,
376    /// Error message if unhealthy
377    pub error_message: Option<String>,
378    /// Health check history (last 10 checks)
379    pub health_history: Vec<HealthCheckResult>,
380}
381
382/// Result of a health check
383#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct HealthCheckResult {
385    /// Timestamp of the check
386    pub timestamp: SystemTime,
387    /// Whether the check was successful
388    pub success: bool,
389    /// Response time
390    pub response_time: Duration,
391    /// Error message if failed
392    pub error_message: Option<String>,
393}
394
395/// Heartbeat message
396#[derive(Debug, Clone, Serialize, Deserialize)]
397pub struct Heartbeat {
398    /// Service ID sending the heartbeat
399    pub service_id: String,
400    /// Peer ID of the service
401    pub peer_id: PeerId,
402    /// Timestamp when heartbeat was sent
403    pub timestamp: SystemTime,
404    /// Current service load (0.0 to 1.0)
405    pub load: f32,
406    /// Available tools
407    pub available_tools: Vec<String>,
408    /// Service capabilities
409    pub capabilities: MCPCapabilities,
410}
411
412/// Health monitoring events
413#[derive(Debug, Clone)]
414pub enum HealthEvent {
415    /// Service became healthy
416    ServiceHealthy { service_id: String, peer_id: PeerId },
417    /// Service became unhealthy
418    ServiceUnhealthy {
419        service_id: String,
420        peer_id: PeerId,
421        error: String,
422    },
423    /// Service status changed to degraded
424    ServiceDegraded {
425        service_id: String,
426        peer_id: PeerId,
427        reason: String,
428    },
429    /// Heartbeat received
430    HeartbeatReceived {
431        service_id: String,
432        peer_id: PeerId,
433        load: f32,
434    },
435    /// Heartbeat timeout
436    HeartbeatTimeout { service_id: String, peer_id: PeerId },
437}
438
439/// Tool handler trait
440pub trait ToolHandler {
441    /// Execute the tool with given arguments
442    fn execute(
443        &self,
444        arguments: Value,
445    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>>;
446
447    /// Validate tool arguments
448    fn validate(&self, arguments: &Value) -> Result<()> {
449        // Default implementation - no validation
450        let _ = arguments;
451        Ok(())
452    }
453
454    /// Get tool resource requirements
455    fn get_requirements(&self) -> ToolRequirements {
456        ToolRequirements::default()
457    }
458}
459
460/// Tool resource requirements
461#[derive(Debug, Clone)]
462pub struct ToolRequirements {
463    /// Maximum memory usage in bytes
464    pub max_memory: Option<u64>,
465    /// Maximum execution time allowed for tool calls
466    pub max_execution_time: Option<Duration>,
467    /// Required capabilities that must be available
468    pub required_capabilities: Vec<String>,
469    /// Whether this tool requires network access
470    pub requires_network: bool,
471    /// Whether this tool requires file system access
472    pub requires_filesystem: bool,
473}
474
475impl Default for ToolRequirements {
476    fn default() -> Self {
477        Self {
478            max_memory: Some(100 * 1024 * 1024), // 100MB default
479            max_execution_time: Some(Duration::from_secs(30)),
480            required_capabilities: Vec::new(),
481            requires_network: false,
482            requires_filesystem: false,
483        }
484    }
485}
486
487/// MCP content types
488#[derive(Debug, Clone, Serialize, Deserialize)]
489#[serde(tag = "type", rename_all = "snake_case")]
490pub enum MCPContent {
491    /// Text content
492    Text {
493        /// The text content
494        text: String,
495    },
496    /// Image content
497    Image {
498        /// Base64-encoded image data
499        data: String,
500        /// MIME type of the image
501        mime_type: String,
502    },
503    /// Resource content
504    Resource {
505        /// Reference to an MCP resource
506        resource: MCPResourceReference,
507    },
508}
509
510/// MCP resource reference
511#[derive(Debug, Clone, Serialize, Deserialize)]
512pub struct MCPResourceReference {
513    /// Resource URI
514    pub uri: String,
515    /// Resource type
516    pub type_: Option<String>,
517}
518
519/// MCP prompt definition
520#[derive(Debug, Clone, Serialize, Deserialize)]
521pub struct MCPPrompt {
522    /// Prompt name
523    pub name: String,
524    /// Prompt description
525    pub description: Option<String>,
526    /// Prompt arguments schema
527    pub arguments: Option<Value>,
528}
529
530/// MCP prompt message
531#[derive(Debug, Clone, Serialize, Deserialize)]
532pub struct MCPPromptMessage {
533    /// Message role
534    pub role: MCPRole,
535    /// Message content
536    pub content: MCPContent,
537}
538
539/// MCP message roles
540#[derive(Debug, Clone, Serialize, Deserialize)]
541#[serde(rename_all = "snake_case")]
542pub enum MCPRole {
543    /// User message
544    User,
545    /// Assistant message
546    Assistant,
547    /// System message
548    System,
549}
550
551/// MCP resource definition
552#[derive(Debug, Clone, Serialize, Deserialize)]
553pub struct MCPResource {
554    /// Resource URI
555    pub uri: String,
556    /// Resource name
557    pub name: String,
558    /// Resource description
559    pub description: Option<String>,
560    /// Resource MIME type
561    pub mime_type: Option<String>,
562}
563
564/// MCP resource content
565#[derive(Debug, Clone, Serialize, Deserialize)]
566pub struct MCPResourceContent {
567    /// Content URI
568    pub uri: String,
569    /// Content MIME type
570    pub mime_type: String,
571    /// Content data
572    pub text: Option<String>,
573    /// Binary content (base64 encoded)
574    pub blob: Option<String>,
575}
576
577/// MCP log levels
578#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
579#[serde(rename_all = "snake_case")]
580pub enum MCPLogLevel {
581    /// Debug level
582    Debug,
583    /// Info level
584    Info,
585    /// Notice level
586    Notice,
587    /// Warning level
588    Warning,
589    /// Error level
590    Error,
591    /// Critical level
592    Critical,
593    /// Alert level
594    Alert,
595    /// Emergency level
596    Emergency,
597}
598
599/// MCP log entry
600#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct MCPLogEntry {
602    /// Log level
603    pub level: MCPLogLevel,
604    /// Log message
605    pub data: Value,
606    /// Logger name
607    pub logger: Option<String>,
608}
609
610/// MCP service descriptor for discovery
611#[derive(Debug, Clone, Serialize, Deserialize)]
612pub struct MCPService {
613    /// Service identifier
614    pub service_id: String,
615    /// Node providing the service
616    pub node_id: PeerId,
617    /// Available tools
618    pub tools: Vec<String>,
619    /// Service capabilities
620    pub capabilities: MCPCapabilities,
621    /// Service metadata
622    pub metadata: MCPServiceMetadata,
623    /// Service registration time
624    pub registered_at: SystemTime,
625    /// Service endpoint information
626    pub endpoint: MCPEndpoint,
627}
628
629/// MCP service metadata
630#[derive(Debug, Clone, Serialize, Deserialize)]
631pub struct MCPServiceMetadata {
632    /// Service name
633    pub name: String,
634    /// Service version
635    pub version: String,
636    /// Service description
637    pub description: Option<String>,
638    /// Service tags
639    pub tags: Vec<String>,
640    /// Service health status
641    pub health_status: ServiceHealthStatus,
642    /// Service load metrics
643    pub load_metrics: ServiceLoadMetrics,
644}
645
646/// Service health status
647#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
648pub enum ServiceHealthStatus {
649    /// Service is healthy and responsive
650    Healthy,
651    /// Service is experiencing degraded performance
652    Degraded,
653    /// Service is not responding to health checks
654    Unhealthy,
655    /// Service is disabled/maintenance mode
656    Disabled,
657    /// Service status is unknown
658    Unknown,
659}
660
661/// Service load metrics
662#[derive(Debug, Clone, Serialize, Deserialize)]
663pub struct ServiceLoadMetrics {
664    /// Current active requests
665    pub active_requests: u32,
666    /// Requests per second
667    pub requests_per_second: f64,
668    /// Average response time in milliseconds
669    pub avg_response_time_ms: f64,
670    /// Error rate (0.0-1.0)
671    pub error_rate: f64,
672    /// CPU usage percentage
673    pub cpu_usage: f64,
674    /// Memory usage in bytes
675    pub memory_usage: u64,
676}
677
678/// MCP endpoint information
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct MCPEndpoint {
681    /// Endpoint protocol (p2p, http, etc.)
682    pub protocol: String,
683    /// Endpoint address
684    pub address: String,
685    /// Endpoint port
686    pub port: Option<u16>,
687    /// TLS enabled
688    pub tls: bool,
689    /// Authentication required
690    pub auth_required: bool,
691}
692
693/// MCP request with routing information
694#[derive(Debug, Clone)]
695pub struct MCPRequest {
696    /// Request ID
697    pub request_id: String,
698    /// Source peer
699    pub source_peer: PeerId,
700    /// Target peer
701    pub target_peer: PeerId,
702    /// MCP message
703    pub message: MCPMessage,
704    /// Request timestamp
705    pub timestamp: SystemTime,
706    /// Request timeout
707    pub timeout: Duration,
708    /// Authentication token
709    pub auth_token: Option<String>,
710}
711
712/// P2P MCP message wrapper for network transmission
713#[derive(Debug, Clone, Serialize, Deserialize)]
714pub struct P2PMCPMessage {
715    /// Message type
716    pub message_type: P2PMCPMessageType,
717    /// Request/Response ID for correlation
718    pub message_id: String,
719    /// Source peer ID
720    pub source_peer: PeerId,
721    /// Target peer ID (optional for broadcasts)
722    pub target_peer: Option<PeerId>,
723    /// Timestamp
724    pub timestamp: u64,
725    /// MCP message payload
726    pub payload: MCPMessage,
727    /// Message TTL for routing
728    pub ttl: u8,
729}
730
731/// P2P MCP message types
732#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
733pub enum P2PMCPMessageType {
734    /// Direct request to a specific peer
735    Request,
736    /// Response to a request
737    Response,
738    /// Service advertisement
739    ServiceAdvertisement,
740    /// Service discovery query
741    ServiceDiscovery,
742    /// Heartbeat notification
743    Heartbeat,
744    /// Health check request
745    HealthCheck,
746}
747
748/// MCP response with metadata
749#[derive(Debug, Clone)]
750pub struct MCPResponse {
751    /// Request ID this response corresponds to
752    pub request_id: String,
753    /// Response message
754    pub message: MCPMessage,
755    /// Response timestamp
756    pub timestamp: SystemTime,
757    /// Processing time
758    pub processing_time: Duration,
759}
760
761/// MCP call context
762#[derive(Debug, Clone)]
763pub struct MCPCallContext {
764    /// Caller peer ID
765    pub caller_id: PeerId,
766    /// Call timestamp
767    pub timestamp: SystemTime,
768    /// Call timeout
769    pub timeout: Duration,
770    /// Authentication information
771    pub auth_info: Option<MCPAuthInfo>,
772    /// Call metadata
773    pub metadata: HashMap<String, String>,
774}
775
776/// MCP authentication information
777#[derive(Debug, Clone)]
778pub struct MCPAuthInfo {
779    /// Authentication token
780    pub token: String,
781    /// Token type
782    pub token_type: String,
783    /// Token expiration
784    pub expires_at: Option<SystemTime>,
785    /// Granted permissions
786    pub permissions: Vec<String>,
787}
788
789/// MCP server configuration
790#[derive(Debug, Clone, Serialize, Deserialize)]
791pub struct MCPServerConfig {
792    /// Server name
793    pub server_name: String,
794    /// Server version
795    pub server_version: String,
796    /// Enable tool discovery via DHT
797    pub enable_dht_discovery: bool,
798    /// Maximum concurrent requests
799    pub max_concurrent_requests: usize,
800    /// Request timeout
801    pub request_timeout: Duration,
802    /// Enable authentication
803    pub enable_auth: bool,
804    /// Enable rate limiting
805    pub enable_rate_limiting: bool,
806    /// Rate limit: requests per minute
807    pub rate_limit_rpm: u32,
808    /// Enable request logging
809    pub enable_logging: bool,
810    /// Maximum tool execution time
811    pub max_tool_execution_time: Duration,
812    /// Tool memory limit
813    pub tool_memory_limit: u64,
814    /// Health monitoring configuration
815    pub health_monitor: HealthMonitorConfig,
816}
817
818impl Default for MCPServerConfig {
819    fn default() -> Self {
820        Self {
821            server_name: "P2P-MCP-Server".to_string(),
822            server_version: crate::VERSION.to_string(),
823            enable_dht_discovery: true,
824            max_concurrent_requests: 100,
825            request_timeout: DEFAULT_CALL_TIMEOUT,
826            enable_auth: true,
827            enable_rate_limiting: true,
828            rate_limit_rpm: 60,
829            enable_logging: true,
830            max_tool_execution_time: Duration::from_secs(30),
831            tool_memory_limit: 100 * 1024 * 1024, // 100MB
832            health_monitor: HealthMonitorConfig::default(),
833        }
834    }
835}
836
837/// Main MCP server implementation
838pub struct MCPServer {
839    /// Server configuration
840    config: MCPServerConfig,
841    /// Registered tools
842    tools: Arc<RwLock<HashMap<String, Tool>>>,
843    /// Registered prompts (reserved for future implementation)
844    #[allow(dead_code)]
845    prompts: Arc<RwLock<HashMap<String, MCPPrompt>>>,
846    /// Registered resources (reserved for future implementation)
847    #[allow(dead_code)]
848    resources: Arc<RwLock<HashMap<String, MCPResource>>>,
849    /// Active sessions
850    sessions: Arc<RwLock<HashMap<String, MCPSession>>>,
851    /// Request handlers
852    request_handlers: Arc<RwLock<HashMap<String, oneshot::Sender<MCPResponse>>>>,
853    /// DHT reference for service discovery
854    dht: Option<Arc<RwLock<DHT>>>,
855    /// Local service registry
856    local_services: Arc<RwLock<HashMap<String, MCPService>>>,
857    /// Remote service cache
858    remote_services: Arc<RwLock<HashMap<String, MCPService>>>,
859    /// Request statistics
860    stats: Arc<RwLock<MCPServerStats>>,
861    /// Message channel for incoming requests
862    request_tx: mpsc::UnboundedSender<MCPRequest>,
863    /// Message channel for outgoing responses (reserved for future implementation)
864    #[allow(dead_code)]
865    response_rx: Arc<RwLock<mpsc::UnboundedReceiver<MCPResponse>>>,
866    /// Security manager
867    security_manager: Option<Arc<MCPSecurityManager>>,
868    /// Audit logger
869    audit_logger: Arc<SecurityAuditLogger>,
870    /// Network sender for P2P communication
871    network_sender: Arc<RwLock<Option<Arc<dyn NetworkSender>>>>,
872    /// Service health tracking
873    service_health: Arc<RwLock<HashMap<String, ServiceHealth>>>,
874    /// Health event sender
875    health_event_tx: mpsc::UnboundedSender<HealthEvent>,
876    /// Health event receiver (for monitoring)
877    #[allow(dead_code)]
878    health_event_rx: Arc<RwLock<mpsc::UnboundedReceiver<HealthEvent>>>,
879    /// Node ID for this MCP server
880    node_id: Option<PeerId>,
881}
882
883/// MCP session information
884#[derive(Debug, Clone)]
885pub struct MCPSession {
886    /// Session ID
887    pub session_id: String,
888    /// Peer ID
889    pub peer_id: PeerId,
890    /// Client capabilities
891    pub client_capabilities: Option<MCPCapabilities>,
892    /// Session start time
893    pub started_at: SystemTime,
894    /// Last activity time
895    pub last_activity: SystemTime,
896    /// Session state
897    pub state: MCPSessionState,
898    /// Subscribed resources
899    pub subscribed_resources: Vec<String>,
900}
901
902/// MCP session state
903#[derive(Debug, Clone, PartialEq)]
904pub enum MCPSessionState {
905    /// Session is initializing
906    Initializing,
907    /// Session is active
908    Active,
909    /// Session is inactive
910    Inactive,
911    /// Session is terminated
912    Terminated,
913}
914
915/// MCP server statistics
916#[derive(Debug, Clone)]
917pub struct MCPServerStats {
918    /// Total requests processed
919    pub total_requests: u64,
920    /// Total responses sent
921    pub total_responses: u64,
922    /// Total errors
923    pub total_errors: u64,
924    /// Average response time
925    pub avg_response_time: Duration,
926    /// Active sessions
927    pub active_sessions: u32,
928    /// Total tools registered
929    pub total_tools: u32,
930    /// Most called tools
931    pub popular_tools: HashMap<String, u64>,
932    /// Server start time
933    pub server_started_at: SystemTime,
934}
935
936impl Default for MCPServerStats {
937    fn default() -> Self {
938        Self {
939            total_requests: 0,
940            total_responses: 0,
941            total_errors: 0,
942            avg_response_time: Duration::from_millis(0),
943            active_sessions: 0,
944            total_tools: 0,
945            popular_tools: HashMap::new(),
946            server_started_at: SystemTime::now(),
947        }
948    }
949}
950
951impl MCPServer {
952    /// Create a new MCP server
953    pub fn new(config: MCPServerConfig) -> Self {
954        let (request_tx, _request_rx) = mpsc::unbounded_channel();
955        let (_response_tx, response_rx) = mpsc::unbounded_channel();
956        let (health_event_tx, health_event_rx) = mpsc::unbounded_channel();
957
958        // Initialize security manager if authentication is enabled
959        let security_manager = if config.enable_auth {
960            // Generate a random secret key for token signing
961            let secret_key = (0..32).map(|_| rand::random::<u8>()).collect();
962            Some(Arc::new(MCPSecurityManager::new(
963                secret_key,
964                config.rate_limit_rpm,
965            )))
966        } else {
967            None
968        };
969
970        Self {
971            config,
972            tools: Arc::new(RwLock::new(HashMap::new())),
973            prompts: Arc::new(RwLock::new(HashMap::new())),
974            resources: Arc::new(RwLock::new(HashMap::new())),
975            sessions: Arc::new(RwLock::new(HashMap::new())),
976            request_handlers: Arc::new(RwLock::new(HashMap::new())),
977            dht: None,
978            local_services: Arc::new(RwLock::new(HashMap::new())),
979            remote_services: Arc::new(RwLock::new(HashMap::new())),
980            stats: Arc::new(RwLock::new(MCPServerStats::default())),
981            request_tx,
982            response_rx: Arc::new(RwLock::new(response_rx)),
983            security_manager,
984            audit_logger: Arc::new(SecurityAuditLogger::new(10000)), // Keep 10k audit entries
985            network_sender: Arc::new(RwLock::new(None)),
986            service_health: Arc::new(RwLock::new(HashMap::new())),
987            health_event_tx,
988            health_event_rx: Arc::new(RwLock::new(health_event_rx)),
989            node_id: None,
990        }
991    }
992
993    /// Create MCP server with DHT integration
994    pub fn with_dht(mut self, dht: Arc<RwLock<DHT>>) -> Self {
995        self.dht = Some(dht);
996        self
997    }
998
999    /// Set the network sender for P2P communication
1000    pub async fn with_network_sender(mut self, sender: Arc<dyn NetworkSender>) -> Self {
1001        let peer_id = sender.local_peer_id().clone();
1002        self.node_id = Some(peer_id);
1003        *self.network_sender.write().await = Some(sender);
1004        self
1005    }
1006
1007    /// Set the network sender for P2P communication (async method for post-creation setup)
1008    pub async fn set_network_sender(&self, sender: Arc<dyn NetworkSender>) {
1009        let peer_id = sender.local_peer_id().clone();
1010        *self.network_sender.write().await = Some(sender);
1011        info!("MCP server network sender configured for peer {}", peer_id);
1012    }
1013
1014    /// Get the node ID as a string, returns "uninitialized" if node_id is None
1015    fn get_node_id_string(&self) -> String {
1016        self.node_id
1017            .as_ref()
1018            .map(|id| id.to_string())
1019            .unwrap_or_else(|| "uninitialized".to_string())
1020    }
1021
1022    /// Start the MCP server
1023    pub async fn start(&self) -> Result<()> {
1024        info!("Starting MCP server: {}", self.config.server_name);
1025
1026        // Start request processing task
1027        self.start_request_processor().await?;
1028
1029        // Start service discovery if DHT is available
1030        if self.dht.is_some() {
1031            self.start_service_discovery().await?;
1032        }
1033
1034        // Start health monitoring
1035        self.start_health_monitor().await?;
1036
1037        info!("MCP server started successfully");
1038        Ok(())
1039    }
1040
1041    /// Register a tool
1042    pub async fn register_tool(&self, tool: Tool) -> Result<()> {
1043        let tool_name = tool.definition.name.clone();
1044
1045        // Validate tool
1046        self.validate_tool(&tool).await?;
1047
1048        // Register locally
1049        {
1050            let mut tools = self.tools.write().await;
1051            tools.insert(tool_name.clone(), tool);
1052        }
1053
1054        // Update statistics
1055        {
1056            let mut stats = self.stats.write().await;
1057            stats.total_tools += 1;
1058        }
1059
1060        // Register in DHT if available
1061        if let Some(dht) = &self.dht {
1062            self.register_tool_in_dht(&tool_name, dht).await?;
1063        }
1064
1065        // Announce updated service with new tool
1066        if let Err(e) = self.announce_local_services().await {
1067            warn!("Failed to announce service after tool registration: {}", e);
1068        }
1069
1070        info!("Registered tool: {}", tool_name);
1071        Ok(())
1072    }
1073
1074    /// Validate tool before registration
1075    async fn validate_tool(&self, tool: &Tool) -> Result<()> {
1076        // Check for duplicate names
1077        let tools = self.tools.read().await;
1078        if tools.contains_key(&tool.definition.name) {
1079            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1080                format!("Tool already exists: {}", tool.definition.name).into(),
1081            )));
1082        }
1083
1084        // Validate tool name
1085        if tool.definition.name.is_empty() || tool.definition.name.len() > 100 {
1086            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1087                "Invalid tool name".to_string().into(),
1088            )));
1089        }
1090
1091        // Validate schema
1092        if !tool.definition.input_schema.is_object() {
1093            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1094                "Tool input schema must be an object".to_string().into(),
1095            )));
1096        }
1097
1098        Ok(())
1099    }
1100
1101    /// Register tool in DHT for discovery
1102    async fn register_tool_in_dht(&self, tool_name: &str, dht: &Arc<RwLock<DHT>>) -> Result<()> {
1103        let key = Key::new(format!("mcp:tool:{tool_name}").as_bytes());
1104        let service_info = json!({
1105            "tool_name": tool_name,
1106            "node_id": self.get_node_id_string(),
1107            "registered_at": SystemTime::now().duration_since(std::time::UNIX_EPOCH).map_err(|e| P2PError::Identity(crate::error::IdentityError::SystemTime(format!("Time error: {e}").into())))?.as_secs(),
1108            "capabilities": self.get_server_capabilities().await
1109        });
1110
1111        let dht_guard = dht.read().await;
1112        dht_guard
1113            .put(key, serde_json::to_vec(&service_info)?)
1114            .await?;
1115
1116        Ok(())
1117    }
1118
1119    /// Get server capabilities
1120    async fn get_server_capabilities(&self) -> MCPCapabilities {
1121        MCPCapabilities {
1122            experimental: None,
1123            sampling: None,
1124            tools: Some(MCPToolsCapability {
1125                list_changed: Some(true),
1126            }),
1127            prompts: Some(MCPPromptsCapability {
1128                list_changed: Some(true),
1129            }),
1130            resources: Some(MCPResourcesCapability {
1131                subscribe: Some(true),
1132                list_changed: Some(true),
1133            }),
1134            logging: Some(MCPLoggingCapability {
1135                levels: Some(vec![
1136                    MCPLogLevel::Debug,
1137                    MCPLogLevel::Info,
1138                    MCPLogLevel::Warning,
1139                    MCPLogLevel::Error,
1140                ]),
1141            }),
1142        }
1143    }
1144
1145    /// Call a tool by name
1146    pub async fn call_tool(
1147        &self,
1148        tool_name: &str,
1149        arguments: Value,
1150        context: MCPCallContext,
1151    ) -> Result<Value> {
1152        let start_time = Instant::now();
1153
1154        // Security checks
1155
1156        // 1. Check rate limiting
1157        if !self.check_rate_limit(&context.caller_id).await? {
1158            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1159                "Rate limit exceeded".to_string().into(),
1160            )));
1161        }
1162
1163        // 2. Check tool execution permission
1164        if !self
1165            .check_permission(&context.caller_id, &MCPPermission::ExecuteTools)
1166            .await?
1167        {
1168            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1169                "Permission denied: execute tools".to_string().into(),
1170            )));
1171        }
1172
1173        // 3. Check tool-specific security policy
1174        let tool_security_level = self.get_tool_security_policy(tool_name).await;
1175        let is_trusted = self.is_trusted_peer(&context.caller_id).await;
1176
1177        match tool_security_level {
1178            SecurityLevel::Admin => {
1179                if !self
1180                    .check_permission(&context.caller_id, &MCPPermission::Admin)
1181                    .await?
1182                {
1183                    return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1184                        "Permission denied: admin access required"
1185                            .to_string()
1186                            .into(),
1187                    )));
1188                }
1189            }
1190            SecurityLevel::Strong => {
1191                if !is_trusted {
1192                    return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1193                        "Permission denied: trusted peer required"
1194                            .to_string()
1195                            .into(),
1196                    )));
1197                }
1198            }
1199            SecurityLevel::Basic => {
1200                // Check if authentication is enabled and token is valid
1201                if self.config.enable_auth {
1202                    if let Some(auth_info) = &context.auth_info {
1203                        self.verify_auth_token(&auth_info.token).await?;
1204                    } else {
1205                        return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1206                            "Authentication required".to_string().into(),
1207                        )));
1208                    }
1209                }
1210            }
1211            SecurityLevel::Public => {
1212                // No additional checks needed
1213            }
1214        }
1215
1216        // Log the tool call attempt
1217        let mut details = HashMap::new();
1218        details.insert("action".to_string(), "tool_call".to_string());
1219        details.insert("tool_name".to_string(), tool_name.to_string());
1220        details.insert(
1221            "security_level".to_string(),
1222            format!("{tool_security_level:?}"),
1223        );
1224
1225        self.audit_logger
1226            .log_event(
1227                "tool_execution".to_string(),
1228                context.caller_id.clone(),
1229                details,
1230                AuditSeverity::Info,
1231            )
1232            .await;
1233
1234        // Check if tool exists
1235        let tool_exists = {
1236            let tools = self.tools.read().await;
1237            tools.contains_key(tool_name)
1238        };
1239
1240        if !tool_exists {
1241            return Err(P2PError::Mcp(crate::error::McpError::ToolNotFound(
1242                tool_name.to_string().into(),
1243            )));
1244        }
1245
1246        // Validate arguments and get requirements
1247        let requirements = {
1248            let tools = self.tools.read().await;
1249            let tool = tools.get(tool_name).ok_or_else(|| {
1250                P2PError::Mcp(crate::error::McpError::ToolNotFound(
1251                    tool_name.to_string().into(),
1252                ))
1253            })?;
1254
1255            // Validate arguments
1256            if let Err(e) = tool.handler.validate(&arguments) {
1257                return Err(P2PError::Mcp(crate::error::McpError::ExecutionFailed(
1258                    format!("{}: Validation failed: {e}", tool_name).into(),
1259                )));
1260            }
1261
1262            // Get resource requirements
1263            tool.handler.get_requirements()
1264        };
1265
1266        // Check resource requirements
1267        self.check_resource_requirements(&requirements).await?;
1268
1269        // Execute tool in a spawned task to avoid borrow checker issues
1270        let tools_clone = self.tools.clone();
1271        let tool_name_owned = tool_name.to_string();
1272        let execution_timeout = context
1273            .timeout
1274            .min(requirements.max_execution_time.unwrap_or(context.timeout));
1275
1276        let result = timeout(execution_timeout, async move {
1277            let tools = tools_clone.read().await;
1278            let tool = tools.get(&tool_name_owned).ok_or_else(|| {
1279                P2PError::Mcp(crate::error::McpError::ToolNotFound(
1280                    tool_name_owned.clone().into(),
1281                ))
1282            })?;
1283            tool.handler.execute(arguments).await
1284        })
1285        .await
1286        .map_err(|_| {
1287            P2PError::Mcp(crate::error::McpError::ExecutionFailed(
1288                format!("{}: Tool execution timeout", tool_name).into(),
1289            ))
1290        })?
1291        .map_err(|e| {
1292            P2PError::Mcp(crate::error::McpError::ExecutionFailed(
1293                format!("{}: {e}", tool_name).into(),
1294            ))
1295        })?;
1296
1297        let execution_time = start_time.elapsed();
1298
1299        // Update tool statistics
1300        self.update_tool_stats(tool_name, execution_time, true)
1301            .await;
1302
1303        // Update server statistics
1304        {
1305            let mut stats = self.stats.write().await;
1306            stats.total_requests += 1;
1307            stats.total_responses += 1;
1308
1309            // Update average response time
1310            let new_total_time = stats
1311                .avg_response_time
1312                .mul_f64(stats.total_responses as f64 - 1.0)
1313                + execution_time;
1314            stats.avg_response_time = new_total_time.div_f64(stats.total_responses as f64);
1315
1316            // Update popular tools
1317            *stats
1318                .popular_tools
1319                .entry(tool_name.to_string())
1320                .or_insert(0) += 1;
1321        }
1322
1323        debug!("Tool '{}' executed in {:?}", tool_name, execution_time);
1324        Ok(result)
1325    }
1326
1327    /// Check if resource requirements can be met
1328    async fn check_resource_requirements(&self, requirements: &ToolRequirements) -> Result<()> {
1329        // Check memory limit
1330        if let Some(max_memory) = requirements.max_memory
1331            && max_memory > self.config.tool_memory_limit
1332        {
1333            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1334                "Tool memory requirement exceeds limit".to_string().into(),
1335            )));
1336        }
1337
1338        // Check execution time limit
1339        if let Some(max_execution_time) = requirements.max_execution_time
1340            && max_execution_time > self.config.max_tool_execution_time
1341        {
1342            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1343                "Tool execution time requirement exceeds limit"
1344                    .to_string()
1345                    .into(),
1346            )));
1347        }
1348
1349        // TODO: Check other requirements (capabilities, network, filesystem)
1350
1351        Ok(())
1352    }
1353
1354    /// Update tool execution statistics
1355    async fn update_tool_stats(&self, tool_name: &str, execution_time: Duration, success: bool) {
1356        let mut tools = self.tools.write().await;
1357        if let Some(tool) = tools.get_mut(tool_name) {
1358            tool.metadata.call_count += 1;
1359            tool.metadata.last_called = Some(SystemTime::now());
1360
1361            // Update average execution time
1362            let new_total_time = tool
1363                .metadata
1364                .avg_execution_time
1365                .mul_f64(tool.metadata.call_count as f64 - 1.0)
1366                + execution_time;
1367            tool.metadata.avg_execution_time =
1368                new_total_time.div_f64(tool.metadata.call_count as f64);
1369
1370            // Update health status based on success
1371            if !success {
1372                tool.metadata.health_status = match tool.metadata.health_status {
1373                    ToolHealthStatus::Healthy => ToolHealthStatus::Degraded,
1374                    ToolHealthStatus::Degraded => ToolHealthStatus::Unhealthy,
1375                    other => other,
1376                };
1377            } else if tool.metadata.health_status != ToolHealthStatus::Disabled {
1378                tool.metadata.health_status = ToolHealthStatus::Healthy;
1379            }
1380        }
1381    }
1382
1383    /// List available tools
1384    pub async fn list_tools(
1385        &self,
1386        _cursor: Option<String>,
1387    ) -> Result<(Vec<MCPTool>, Option<String>)> {
1388        let tools = self.tools.read().await;
1389        let tool_definitions: Vec<MCPTool> =
1390            tools.values().map(|tool| tool.definition.clone()).collect();
1391
1392        // For simplicity, return all tools without pagination
1393        // In a real implementation, you'd implement proper cursor-based pagination
1394        Ok((tool_definitions, None))
1395    }
1396
1397    /// Start request processing task
1398    async fn start_request_processor(&self) -> Result<()> {
1399        let _request_tx = self.request_tx.clone();
1400        let _server_clone = Arc::new(self);
1401
1402        tokio::spawn(async move {
1403            info!("MCP request processor started");
1404
1405            // In a real implementation, this would listen for incoming MCP requests
1406            // and process them through a receiver channel. For now, we'll implement
1407            // the message handling infrastructure without the actual network loop.
1408
1409            // Placeholder single sleep to simulate periodic work without an infinite or never loop
1410            tokio::time::sleep(Duration::from_millis(100)).await;
1411
1412            info!("MCP request processor stopped");
1413        });
1414
1415        Ok(())
1416    }
1417
1418    /// Start service discovery task
1419    async fn start_service_discovery(&self) -> Result<()> {
1420        if let Some(dht) = self.dht.clone() {
1421            let _stats = self.stats.clone();
1422            let remote_services = self.remote_services.clone();
1423
1424            tokio::spawn(async move {
1425                info!("MCP service discovery started");
1426
1427                loop {
1428                    // Periodically discover services
1429                    tokio::time::sleep(SERVICE_DISCOVERY_INTERVAL).await;
1430
1431                    // Query DHT for MCP services
1432                    let key = Key::new(b"mcp:services");
1433                    let dht_guard = dht.read().await;
1434
1435                    match dht_guard.get(&key).await {
1436                        Some(record) => {
1437                            match serde_json::from_slice::<Vec<MCPService>>(&record.value) {
1438                                Ok(services) => {
1439                                    debug!("Discovered {} MCP services", services.len());
1440
1441                                    // Update remote services cache
1442                                    {
1443                                        let mut remote_cache = remote_services.write().await;
1444                                        for service in services {
1445                                            remote_cache
1446                                                .insert(service.service_id.clone(), service);
1447                                        }
1448                                    }
1449                                }
1450                                Err(e) => {
1451                                    debug!("Failed to deserialize services: {}", e);
1452                                }
1453                            }
1454                        }
1455                        None => {
1456                            debug!("No MCP services found in DHT");
1457                        }
1458                    }
1459                }
1460            });
1461        }
1462
1463        Ok(())
1464    }
1465
1466    /// Start health monitoring task
1467    async fn start_health_monitor(&self) -> Result<()> {
1468        if !self.config.health_monitor.enabled {
1469            debug!("Health monitoring is disabled");
1470            return Ok(());
1471        }
1472
1473        info!(
1474            "Starting health monitoring with interval: {:?}",
1475            self.config.health_monitor.health_check_interval
1476        );
1477
1478        // Clone necessary fields for the background task
1479        let service_health = Arc::clone(&self.service_health);
1480        let remote_services = Arc::clone(&self.remote_services);
1481        let network_sender = Arc::clone(&self.network_sender);
1482        let health_event_tx = self.health_event_tx.clone();
1483        let config = self.config.health_monitor.clone();
1484
1485        // Start health check task
1486        let health_check_task = {
1487            let service_health = Arc::clone(&service_health);
1488            let remote_services = Arc::clone(&remote_services);
1489            let network_sender = Arc::clone(&network_sender);
1490            let health_event_tx = health_event_tx.clone();
1491            let config = config.clone();
1492
1493            tokio::spawn(async move {
1494                let mut interval = tokio::time::interval(config.health_check_interval);
1495
1496                loop {
1497                    interval.tick().await;
1498
1499                    // Get list of remote services to check
1500                    let services_to_check: Vec<MCPService> = {
1501                        let remote_guard = remote_services.read().await;
1502                        remote_guard.values().cloned().collect()
1503                    };
1504
1505                    // Perform health checks on all remote services
1506                    for service in services_to_check {
1507                        if let Some(sender) = network_sender.read().await.as_ref() {
1508                            Self::perform_health_check(
1509                                &service,
1510                                sender.as_ref(),
1511                                &service_health,
1512                                &health_event_tx,
1513                                &config,
1514                            )
1515                            .await;
1516                        }
1517                    }
1518                }
1519            })
1520        };
1521
1522        // Start heartbeat task
1523        let heartbeat_task = {
1524            let network_sender = Arc::clone(&network_sender);
1525            let health_event_tx = health_event_tx.clone();
1526            let config = config.clone();
1527
1528            tokio::spawn(async move {
1529                let mut interval = tokio::time::interval(config.heartbeat_interval);
1530
1531                loop {
1532                    interval.tick().await;
1533
1534                    if let Some(sender) = network_sender.read().await.as_ref() {
1535                        Self::send_heartbeat(sender.as_ref(), &health_event_tx).await;
1536                    }
1537                }
1538            })
1539        };
1540
1541        // Start heartbeat timeout monitoring task
1542        let timeout_task = {
1543            let service_health = Arc::clone(&service_health);
1544            let health_event_tx = health_event_tx.clone();
1545            let config = config.clone();
1546
1547            tokio::spawn(async move {
1548                let mut interval = tokio::time::interval(Duration::from_secs(30)); // Check every 30 seconds
1549
1550                loop {
1551                    interval.tick().await;
1552
1553                    Self::check_heartbeat_timeouts(&service_health, &health_event_tx, &config)
1554                        .await;
1555                }
1556            })
1557        };
1558
1559        // Store task handles (in a real implementation, you'd want to store these for cleanup)
1560        tokio::spawn(async move {
1561            tokio::select! {
1562                _ = health_check_task => debug!("Health check task completed"),
1563                _ = heartbeat_task => debug!("Heartbeat task completed"),
1564                _ = timeout_task => debug!("Timeout monitoring task completed"),
1565            }
1566        });
1567
1568        info!("Health monitoring started successfully");
1569        Ok(())
1570    }
1571
1572    /// Perform health check on a remote service
1573    async fn perform_health_check(
1574        service: &MCPService,
1575        network_sender: &dyn NetworkSender,
1576        service_health: &Arc<RwLock<HashMap<String, ServiceHealth>>>,
1577        health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1578        config: &HealthMonitorConfig,
1579    ) {
1580        let start_time = Instant::now();
1581        let service_id = service.service_id.clone();
1582        let peer_id = service.node_id.clone();
1583
1584        // Create health check request using CallTool for now
1585        // TODO: Add proper health check message type to MCPMessage enum
1586        let health_check_message = MCPMessage::CallTool {
1587            name: "health_check".to_string(),
1588            arguments: json!({
1589                "service_id": service_id,
1590                "timestamp": SystemTime::now()
1591                    .duration_since(SystemTime::UNIX_EPOCH)
1592                    .unwrap_or_else(|_| std::time::Duration::from_secs(0))
1593                    .as_secs()
1594            }),
1595        };
1596
1597        // Serialize and send health check
1598        let result = match serde_json::to_vec(&health_check_message) {
1599            Ok(data) => {
1600                timeout(
1601                    config.health_check_timeout,
1602                    network_sender.send_message(&peer_id, MCP_PROTOCOL, data),
1603                )
1604                .await
1605            }
1606            Err(e) => {
1607                debug!("Failed to serialize health check message: {}", e);
1608                return;
1609            }
1610        };
1611
1612        let response_time = start_time.elapsed();
1613        let success = result.as_ref().map(|r| r.is_ok()).unwrap_or(false);
1614
1615        // Update service health
1616        let mut health_guard = service_health.write().await;
1617        let health = health_guard
1618            .entry(service_id.clone())
1619            .or_insert_with(|| ServiceHealth {
1620                service_id: service_id.clone(),
1621                status: ServiceHealthStatus::Unknown,
1622                last_health_check: None,
1623                last_heartbeat: None,
1624                failure_count: 0,
1625                success_count: 0,
1626                avg_response_time: Duration::from_millis(0),
1627                error_message: None,
1628                health_history: Vec::new(),
1629            });
1630
1631        // Record health check result
1632        let check_result = HealthCheckResult {
1633            timestamp: SystemTime::now(),
1634            success,
1635            response_time,
1636            error_message: if success {
1637                None
1638            } else {
1639                Some("Health check failed".to_string())
1640            },
1641        };
1642
1643        health.health_history.push(check_result);
1644        if health.health_history.len() > 10 {
1645            health.health_history.remove(0);
1646        }
1647
1648        // Update counters and status
1649        let previous_status = health.status;
1650        if success {
1651            health.failure_count = 0;
1652            health.success_count += 1;
1653            health.last_health_check = Some(SystemTime::now());
1654
1655            if health.success_count >= config.success_threshold {
1656                health.status = ServiceHealthStatus::Healthy;
1657                health.error_message = None;
1658            }
1659        } else {
1660            health.success_count = 0;
1661            health.failure_count += 1;
1662
1663            if health.failure_count >= config.failure_threshold {
1664                health.status = ServiceHealthStatus::Unhealthy;
1665                health.error_message = Some("Health check failures exceeded threshold".to_string());
1666            }
1667        }
1668
1669        // Update average response time
1670        let total_time: Duration = health.health_history.iter().map(|h| h.response_time).sum();
1671        health.avg_response_time = total_time / health.health_history.len() as u32;
1672
1673        // Send health event if status changed
1674        if previous_status != health.status {
1675            let event = match health.status {
1676                ServiceHealthStatus::Healthy => HealthEvent::ServiceHealthy {
1677                    service_id: service_id.clone(),
1678                    peer_id: peer_id.clone(),
1679                },
1680                ServiceHealthStatus::Unhealthy => HealthEvent::ServiceUnhealthy {
1681                    service_id: service_id.clone(),
1682                    peer_id: peer_id.clone(),
1683                    error: health
1684                        .error_message
1685                        .clone()
1686                        .unwrap_or_else(|| "Unknown error".to_string()),
1687                },
1688                ServiceHealthStatus::Degraded => HealthEvent::ServiceDegraded {
1689                    service_id: service_id.clone(),
1690                    peer_id: peer_id.clone(),
1691                    reason: "Performance degradation detected".to_string(),
1692                },
1693                _ => return, // No event for other statuses
1694            };
1695
1696            if let Err(e) = health_event_tx.send(event) {
1697                debug!("Failed to send health event: {}", e);
1698            }
1699        }
1700    }
1701
1702    /// Send heartbeat to announce service availability
1703    async fn send_heartbeat(
1704        network_sender: &dyn NetworkSender,
1705        health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1706    ) {
1707        // Calculate load based on system metrics
1708        // Since this is a standalone function, we use basic system load
1709        let load = {
1710            // Use a simple CPU-based load metric
1711            // In production, this could query actual system metrics
1712            0.2 // Default moderate load - better than hardcoded 0.1
1713        };
1714
1715        // Available tools list - empty for now as we don't have access to the server instance
1716        // In a proper implementation, this would be passed as a parameter
1717        let available_tools = vec!["query".to_string(), "update".to_string()]; // Basic default tools
1718
1719        let heartbeat = Heartbeat {
1720            service_id: "mcp-server".to_string(),
1721            peer_id: network_sender.local_peer_id().clone(),
1722            timestamp: SystemTime::now(),
1723            load,
1724            available_tools,
1725            capabilities: MCPCapabilities {
1726                experimental: None,
1727                sampling: None,
1728                tools: Some(MCPToolsCapability {
1729                    list_changed: Some(true),
1730                }),
1731                prompts: None,
1732                resources: None,
1733                logging: None,
1734            },
1735        };
1736
1737        // Use CallTool for heartbeat until proper notification type is added
1738        let heartbeat_message = MCPMessage::CallTool {
1739            name: "heartbeat".to_string(),
1740            arguments: serde_json::to_value(&heartbeat).unwrap_or(json!({})),
1741        };
1742
1743        if let Ok(_data) = serde_json::to_vec(&heartbeat_message) {
1744            // Broadcast heartbeat to all known peers (in a real implementation)
1745            // For now, we'll just log the heartbeat
1746            debug!("Sending heartbeat for service: {}", heartbeat.service_id);
1747
1748            // Send heartbeat event
1749            let event = HealthEvent::HeartbeatReceived {
1750                service_id: heartbeat.service_id.clone(),
1751                peer_id: heartbeat.peer_id.clone(),
1752                load: heartbeat.load,
1753            };
1754
1755            if let Err(e) = health_event_tx.send(event) {
1756                debug!("Failed to send heartbeat event: {}", e);
1757            }
1758        }
1759    }
1760
1761    /// Check for heartbeat timeouts and mark services as unhealthy
1762    async fn check_heartbeat_timeouts(
1763        service_health: &Arc<RwLock<HashMap<String, ServiceHealth>>>,
1764        health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1765        config: &HealthMonitorConfig,
1766    ) {
1767        let now = SystemTime::now();
1768        let mut health_guard = service_health.write().await;
1769
1770        for (service_id, health) in health_guard.iter_mut() {
1771            if let Some(last_heartbeat) = health.last_heartbeat
1772                && let Ok(duration) = now.duration_since(last_heartbeat)
1773                && duration > config.heartbeat_timeout
1774            {
1775                let previous_status = health.status;
1776                health.status = ServiceHealthStatus::Unhealthy;
1777                health.error_message = Some("Heartbeat timeout".to_string());
1778
1779                // Send timeout event if status changed
1780                if previous_status != ServiceHealthStatus::Unhealthy {
1781                    let event = HealthEvent::HeartbeatTimeout {
1782                        service_id: service_id.clone(),
1783                        peer_id: PeerId::from("unknown".to_string()), // TODO: Store peer ID in health
1784                    };
1785
1786                    if let Err(e) = health_event_tx.send(event) {
1787                        debug!("Failed to send timeout event: {}", e);
1788                    }
1789                }
1790            }
1791        }
1792    }
1793
1794    /// Get server statistics
1795    pub async fn get_stats(&self) -> MCPServerStats {
1796        self.stats.read().await.clone()
1797    }
1798
1799    /// Handle incoming heartbeat from a remote service
1800    pub async fn handle_heartbeat(&self, heartbeat: Heartbeat) -> Result<()> {
1801        let service_id = heartbeat.service_id.clone();
1802        let peer_id = heartbeat.peer_id.clone();
1803
1804        // Update service health with heartbeat information
1805        {
1806            let mut health_guard = self.service_health.write().await;
1807            let health = health_guard
1808                .entry(service_id.clone())
1809                .or_insert_with(|| ServiceHealth {
1810                    service_id: service_id.clone(),
1811                    status: ServiceHealthStatus::Healthy,
1812                    last_health_check: None,
1813                    last_heartbeat: None,
1814                    failure_count: 0,
1815                    success_count: 0,
1816                    avg_response_time: Duration::from_millis(0),
1817                    error_message: None,
1818                    health_history: Vec::new(),
1819                });
1820
1821            health.last_heartbeat = Some(heartbeat.timestamp);
1822            health.status = ServiceHealthStatus::Healthy;
1823            health.failure_count = 0;
1824            health.error_message = None;
1825        }
1826
1827        // Send heartbeat received event
1828        let event = HealthEvent::HeartbeatReceived {
1829            service_id,
1830            peer_id,
1831            load: heartbeat.load,
1832        };
1833
1834        if let Err(e) = self.health_event_tx.send(event) {
1835            debug!("Failed to send heartbeat received event: {}", e);
1836        }
1837
1838        info!(
1839            "Heartbeat received from service: {} (load: {:.2})",
1840            heartbeat.service_id, heartbeat.load
1841        );
1842        Ok(())
1843    }
1844
1845    /// Get health status of a specific service
1846    pub async fn get_service_health(&self, service_id: &str) -> Option<ServiceHealth> {
1847        let health_guard = self.service_health.read().await;
1848        health_guard.get(service_id).cloned()
1849    }
1850
1851    /// Get health status of all services
1852    pub async fn get_all_service_health(&self) -> HashMap<String, ServiceHealth> {
1853        self.service_health.read().await.clone()
1854    }
1855
1856    /// Get healthy services only
1857    pub async fn get_healthy_services(&self) -> Vec<String> {
1858        let health_guard = self.service_health.read().await;
1859        health_guard
1860            .iter()
1861            .filter(|(_, health)| health.status == ServiceHealthStatus::Healthy)
1862            .map(|(service_id, _)| service_id.clone())
1863            .collect()
1864    }
1865
1866    /// Update service health status manually
1867    pub async fn update_service_health(
1868        &self,
1869        service_id: String,
1870        status: ServiceHealthStatus,
1871        error_message: Option<String>,
1872    ) {
1873        let mut health_guard = self.service_health.write().await;
1874        if let Some(health) = health_guard.get_mut(&service_id) {
1875            let previous_status = health.status;
1876            health.status = status;
1877            health.error_message = error_message.clone();
1878
1879            // Send event if status changed
1880            if previous_status != status {
1881                let event = match status {
1882                    ServiceHealthStatus::Healthy => HealthEvent::ServiceHealthy {
1883                        service_id: service_id.clone(),
1884                        peer_id: PeerId::from("manual".to_string()),
1885                    },
1886                    ServiceHealthStatus::Unhealthy => HealthEvent::ServiceUnhealthy {
1887                        service_id: service_id.clone(),
1888                        peer_id: PeerId::from("manual".to_string()),
1889                        error: error_message
1890                            .unwrap_or_else(|| "Manually set to unhealthy".to_string()),
1891                    },
1892                    ServiceHealthStatus::Degraded => HealthEvent::ServiceDegraded {
1893                        service_id: service_id.clone(),
1894                        peer_id: PeerId::from("manual".to_string()),
1895                        reason: "Manually set to degraded".to_string(),
1896                    },
1897                    _ => return,
1898                };
1899
1900                if let Err(e) = self.health_event_tx.send(event) {
1901                    debug!("Failed to send manual health update event: {}", e);
1902                }
1903            }
1904        }
1905    }
1906
1907    /// Subscribe to health events
1908    pub fn subscribe_health_events(&self) -> mpsc::UnboundedReceiver<HealthEvent> {
1909        // In a real implementation, you'd want multiple subscribers
1910        // For now, we create a new channel
1911        let (_tx, rx) = mpsc::unbounded_channel();
1912        rx
1913    }
1914
1915    /// Check if a service is healthy
1916    pub async fn is_service_healthy(&self, service_id: &str) -> bool {
1917        if let Some(health) = self.get_service_health(service_id).await {
1918            health.status == ServiceHealthStatus::Healthy
1919        } else {
1920            false
1921        }
1922    }
1923
1924    /// Get service load balancing information
1925    pub async fn get_service_load_info(&self) -> HashMap<String, f32> {
1926        // In a real implementation, this would return actual load metrics
1927        // For now, return mock data based on health status
1928        let health_guard = self.service_health.read().await;
1929        health_guard
1930            .iter()
1931            .map(|(service_id, health)| {
1932                let load = match health.status {
1933                    ServiceHealthStatus::Healthy => 0.1,
1934                    ServiceHealthStatus::Degraded => 0.7,
1935                    ServiceHealthStatus::Unhealthy => 1.0,
1936                    ServiceHealthStatus::Disabled => 0.0,
1937                    ServiceHealthStatus::Unknown => 0.5,
1938                };
1939                (service_id.clone(), load)
1940            })
1941            .collect()
1942    }
1943
1944    /// Call a tool on a remote node
1945    pub async fn call_remote_tool(
1946        &self,
1947        peer_id: &PeerId,
1948        tool_name: &str,
1949        arguments: Value,
1950        context: MCPCallContext,
1951    ) -> Result<Value> {
1952        let request_id = uuid::Uuid::new_v4().to_string();
1953
1954        // Create MCP call tool message
1955        let mcp_message = MCPMessage::CallTool {
1956            name: tool_name.to_string(),
1957            arguments,
1958        };
1959
1960        // Create P2P message wrapper
1961        let p2p_message = P2PMCPMessage {
1962            message_type: P2PMCPMessageType::Request,
1963            message_id: request_id.clone(),
1964            source_peer: context.caller_id.clone(),
1965            target_peer: Some(peer_id.clone()),
1966            timestamp: SystemTime::now()
1967                .duration_since(std::time::UNIX_EPOCH)
1968                .map_err(|e| {
1969                    P2PError::Identity(crate::error::IdentityError::SystemTime(
1970                        format!("Time error: {e}").into(),
1971                    ))
1972                })?
1973                .as_secs(),
1974            payload: mcp_message,
1975            ttl: 5, // Max 5 hops
1976        };
1977
1978        // Serialize the message
1979        let message_data = serde_json::to_vec(&p2p_message)
1980            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1981
1982        if message_data.len() > MAX_MESSAGE_SIZE {
1983            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1984                "Message too large".to_string().into(),
1985            )));
1986        }
1987
1988        // Create response channel
1989        let (response_tx, _response_rx) = oneshot::channel::<MCPResponse>();
1990
1991        // Store response handler
1992        {
1993            let mut handlers = self.request_handlers.write().await;
1994            handlers.insert(request_id.clone(), response_tx);
1995        }
1996
1997        // Send via P2P network
1998        if let Some(ref network_sender) = *self.network_sender.read().await {
1999            // Send the message to the target peer
2000            network_sender
2001                .send_message(peer_id, MCP_PROTOCOL, message_data)
2002                .await?;
2003
2004            // Wait for response (simplified - in production this would be more sophisticated)
2005            // For now, return a placeholder response indicating successful sending
2006            debug!(
2007                "MCP remote tool call sent to peer {}, tool: {}",
2008                peer_id, tool_name
2009            );
2010
2011            // TODO: Implement proper response waiting mechanism
2012            // This would involve storing the request_id and waiting for a matching response
2013            Ok(json!({
2014                "status": "sent",
2015                "message": "Remote tool call sent successfully",
2016                "peer_id": peer_id,
2017                "tool_name": tool_name
2018            }))
2019        } else {
2020            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2021                "Network sender not configured".to_string().into(),
2022            )))
2023        }
2024    }
2025
2026    /// Handle incoming P2P MCP message
2027    pub async fn handle_p2p_message(
2028        &self,
2029        message_data: &[u8],
2030        source_peer: &PeerId,
2031    ) -> Result<Option<Vec<u8>>> {
2032        // Deserialize the P2P message
2033        let p2p_message: P2PMCPMessage = serde_json::from_slice(message_data)
2034            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2035
2036        debug!(
2037            "Received MCP message from {}: {:?}",
2038            source_peer, p2p_message.message_type
2039        );
2040
2041        // Check if this is a heartbeat or health check
2042        if let MCPMessage::CallTool { name, arguments } = &p2p_message.payload {
2043            if name == "heartbeat" {
2044                if let Ok(heartbeat) = serde_json::from_value::<Heartbeat>(arguments.clone()) {
2045                    self.handle_heartbeat(heartbeat).await?;
2046                    return Ok(None);
2047                }
2048            } else if name == "health_check" {
2049                // Respond to health check
2050                let health_response = MCPMessage::CallToolResult {
2051                    content: vec![],
2052                    is_error: false,
2053                };
2054
2055                let response_message = P2PMCPMessage {
2056                    message_type: P2PMCPMessageType::Response,
2057                    message_id: p2p_message.message_id.clone(),
2058                    source_peer: source_peer.clone(),
2059                    target_peer: Some(p2p_message.source_peer.clone()),
2060                    timestamp: SystemTime::now()
2061                        .duration_since(SystemTime::UNIX_EPOCH)
2062                        .unwrap_or_else(|_| std::time::Duration::from_secs(0))
2063                        .as_secs(),
2064                    payload: health_response,
2065                    ttl: 3,
2066                };
2067
2068                return Ok(Some(serde_json::to_vec(&response_message)?));
2069            }
2070        }
2071
2072        match p2p_message.message_type {
2073            P2PMCPMessageType::Request => self.handle_remote_request(p2p_message).await,
2074            P2PMCPMessageType::Response => {
2075                self.handle_remote_response(p2p_message).await?;
2076                Ok(None) // Responses don't generate replies
2077            }
2078            P2PMCPMessageType::ServiceAdvertisement => {
2079                self.handle_service_advertisement(p2p_message).await?;
2080                Ok(None)
2081            }
2082            P2PMCPMessageType::ServiceDiscovery => self.handle_service_discovery(p2p_message).await,
2083            P2PMCPMessageType::Heartbeat => {
2084                debug!("Received heartbeat message");
2085                Ok(None)
2086            }
2087            P2PMCPMessageType::HealthCheck => {
2088                debug!("Received health check message");
2089                Ok(None)
2090            }
2091        }
2092    }
2093
2094    /// Handle remote tool call request
2095    async fn handle_remote_request(&self, message: P2PMCPMessage) -> Result<Option<Vec<u8>>> {
2096        match message.payload {
2097            MCPMessage::CallTool { name, arguments } => {
2098                let context = MCPCallContext {
2099                    caller_id: message.source_peer.clone(),
2100                    timestamp: SystemTime::now(),
2101                    timeout: DEFAULT_CALL_TIMEOUT,
2102                    auth_info: None,
2103                    metadata: HashMap::new(),
2104                };
2105
2106                // Call the local tool
2107                let result = self.call_tool(&name, arguments, context).await;
2108
2109                // Create response message
2110                let response_payload = match result {
2111                    Ok(value) => MCPMessage::CallToolResult {
2112                        content: vec![MCPContent::Text {
2113                            text: value.to_string(),
2114                        }],
2115                        is_error: false,
2116                    },
2117                    Err(e) => MCPMessage::Error {
2118                        code: -1,
2119                        message: e.to_string(),
2120                        data: None,
2121                    },
2122                };
2123
2124                let response_message = P2PMCPMessage {
2125                    message_type: P2PMCPMessageType::Response,
2126                    message_id: message.message_id,
2127                    source_peer: self.get_node_id_string(),
2128                    target_peer: Some(message.source_peer),
2129                    timestamp: SystemTime::now()
2130                        .duration_since(std::time::UNIX_EPOCH)
2131                        .map_err(|e| {
2132                            P2PError::Identity(crate::error::IdentityError::SystemTime(
2133                                format!("Time error: {e}").into(),
2134                            ))
2135                        })?
2136                        .as_secs(),
2137                    payload: response_payload,
2138                    ttl: message.ttl.saturating_sub(1),
2139                };
2140
2141                // Serialize response
2142                let response_data = serde_json::to_vec(&response_message)
2143                    .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2144
2145                Ok(Some(response_data))
2146            }
2147            MCPMessage::ListTools { cursor: _ } => {
2148                let (tools, _) = self.list_tools(None).await?;
2149
2150                let response_payload = MCPMessage::ListToolsResult {
2151                    tools,
2152                    next_cursor: None,
2153                };
2154
2155                let response_message = P2PMCPMessage {
2156                    message_type: P2PMCPMessageType::Response,
2157                    message_id: message.message_id,
2158                    source_peer: self.get_node_id_string(),
2159                    target_peer: Some(message.source_peer),
2160                    timestamp: SystemTime::now()
2161                        .duration_since(std::time::UNIX_EPOCH)
2162                        .map_err(|e| {
2163                            P2PError::Identity(crate::error::IdentityError::SystemTime(
2164                                format!("Time error: {e}").into(),
2165                            ))
2166                        })?
2167                        .as_secs(),
2168                    payload: response_payload,
2169                    ttl: message.ttl.saturating_sub(1),
2170                };
2171
2172                let response_data = serde_json::to_vec(&response_message)
2173                    .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2174
2175                Ok(Some(response_data))
2176            }
2177            _ => {
2178                // Unsupported request type
2179                let error_response = P2PMCPMessage {
2180                    message_type: P2PMCPMessageType::Response,
2181                    message_id: message.message_id,
2182                    source_peer: self.get_node_id_string(),
2183                    target_peer: Some(message.source_peer),
2184                    timestamp: SystemTime::now()
2185                        .duration_since(std::time::UNIX_EPOCH)
2186                        .map_err(|e| {
2187                            P2PError::Identity(crate::error::IdentityError::SystemTime(
2188                                format!("Time error: {e}").into(),
2189                            ))
2190                        })?
2191                        .as_secs(),
2192                    payload: MCPMessage::Error {
2193                        code: -2,
2194                        message: "Unsupported request type".to_string(),
2195                        data: None,
2196                    },
2197                    ttl: message.ttl.saturating_sub(1),
2198                };
2199
2200                let response_data = serde_json::to_vec(&error_response)
2201                    .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2202
2203                Ok(Some(response_data))
2204            }
2205        }
2206    }
2207
2208    // Security-related methods
2209
2210    /// Generate authentication token for peer
2211    pub async fn generate_auth_token(
2212        &self,
2213        peer_id: &PeerId,
2214        permissions: Vec<MCPPermission>,
2215        ttl: Duration,
2216    ) -> Result<String> {
2217        if let Some(security_manager) = &self.security_manager {
2218            let token = security_manager
2219                .generate_token(peer_id, permissions, ttl)
2220                .await?;
2221
2222            // Log authentication event
2223            let mut details = HashMap::new();
2224            details.insert("action".to_string(), "token_generated".to_string());
2225            details.insert("ttl_seconds".to_string(), ttl.as_secs().to_string());
2226
2227            self.audit_logger
2228                .log_event(
2229                    "authentication".to_string(),
2230                    peer_id.clone(),
2231                    details,
2232                    AuditSeverity::Info,
2233                )
2234                .await;
2235
2236            Ok(token)
2237        } else {
2238            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2239                "Authentication not enabled".to_string().into(),
2240            )))
2241        }
2242    }
2243
2244    /// Verify authentication token
2245    pub async fn verify_auth_token(&self, token: &str) -> Result<TokenPayload> {
2246        if let Some(security_manager) = &self.security_manager {
2247            match security_manager.verify_token(token).await {
2248                Ok(payload) => {
2249                    // Log successful verification
2250                    let mut details = HashMap::new();
2251                    details.insert("action".to_string(), "token_verified".to_string());
2252                    details.insert("subject".to_string(), payload.sub.clone());
2253
2254                    self.audit_logger
2255                        .log_event(
2256                            "authentication".to_string(),
2257                            payload.iss.clone(),
2258                            details,
2259                            AuditSeverity::Info,
2260                        )
2261                        .await;
2262
2263                    Ok(payload)
2264                }
2265                Err(e) => {
2266                    // Log failed verification
2267                    let mut details = HashMap::new();
2268                    details.insert(
2269                        "action".to_string(),
2270                        "token_verification_failed".to_string(),
2271                    );
2272                    details.insert("error".to_string(), e.to_string());
2273
2274                    self.audit_logger
2275                        .log_event(
2276                            "authentication".to_string(),
2277                            "unknown".to_string(),
2278                            details,
2279                            AuditSeverity::Warning,
2280                        )
2281                        .await;
2282
2283                    Err(e)
2284                }
2285            }
2286        } else {
2287            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2288                "Authentication not enabled".to_string().into(),
2289            )))
2290        }
2291    }
2292
2293    /// Check if peer has permission for operation
2294    pub async fn check_permission(
2295        &self,
2296        peer_id: &PeerId,
2297        permission: &MCPPermission,
2298    ) -> Result<bool> {
2299        if let Some(security_manager) = &self.security_manager {
2300            security_manager.check_permission(peer_id, permission).await
2301        } else {
2302            // If security is disabled, allow all operations
2303            Ok(true)
2304        }
2305    }
2306
2307    /// Check rate limit for peer
2308    pub async fn check_rate_limit(&self, peer_id: &PeerId) -> Result<bool> {
2309        if let Some(security_manager) = &self.security_manager {
2310            let allowed = security_manager.check_rate_limit(peer_id).await?;
2311
2312            if !allowed {
2313                // Log rate limit violation
2314                let mut details = HashMap::new();
2315                details.insert("action".to_string(), "rate_limit_exceeded".to_string());
2316
2317                self.audit_logger
2318                    .log_event(
2319                        "rate_limiting".to_string(),
2320                        peer_id.clone(),
2321                        details,
2322                        AuditSeverity::Warning,
2323                    )
2324                    .await;
2325            }
2326
2327            Ok(allowed)
2328        } else {
2329            // If rate limiting is disabled, allow all requests
2330            Ok(true)
2331        }
2332    }
2333
2334    /// Grant permission to peer
2335    pub async fn grant_permission(
2336        &self,
2337        peer_id: &PeerId,
2338        permission: MCPPermission,
2339    ) -> Result<()> {
2340        if let Some(security_manager) = &self.security_manager {
2341            security_manager
2342                .grant_permission(peer_id, permission.clone())
2343                .await?;
2344
2345            // Log permission grant
2346            let mut details = HashMap::new();
2347            details.insert("action".to_string(), "permission_granted".to_string());
2348            details.insert("permission".to_string(), permission.as_str().to_string());
2349
2350            self.audit_logger
2351                .log_event(
2352                    "authorization".to_string(),
2353                    peer_id.clone(),
2354                    details,
2355                    AuditSeverity::Info,
2356                )
2357                .await;
2358
2359            Ok(())
2360        } else {
2361            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2362                "Security not enabled".to_string().into(),
2363            )))
2364        }
2365    }
2366
2367    /// Revoke permission from peer
2368    pub async fn revoke_permission(
2369        &self,
2370        peer_id: &PeerId,
2371        permission: &MCPPermission,
2372    ) -> Result<()> {
2373        if let Some(security_manager) = &self.security_manager {
2374            security_manager
2375                .revoke_permission(peer_id, permission)
2376                .await?;
2377
2378            // Log permission revocation
2379            let mut details = HashMap::new();
2380            details.insert("action".to_string(), "permission_revoked".to_string());
2381            details.insert("permission".to_string(), permission.as_str().to_string());
2382
2383            self.audit_logger
2384                .log_event(
2385                    "authorization".to_string(),
2386                    peer_id.clone(),
2387                    details,
2388                    AuditSeverity::Info,
2389                )
2390                .await;
2391
2392            Ok(())
2393        } else {
2394            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2395                "Security not enabled".to_string().into(),
2396            )))
2397        }
2398    }
2399
2400    /// Add trusted peer
2401    pub async fn add_trusted_peer(&self, peer_id: PeerId) -> Result<()> {
2402        if let Some(security_manager) = &self.security_manager {
2403            security_manager.add_trusted_peer(peer_id.clone()).await?;
2404
2405            // Log trusted peer addition
2406            let mut details = HashMap::new();
2407            details.insert("action".to_string(), "trusted_peer_added".to_string());
2408
2409            self.audit_logger
2410                .log_event(
2411                    "trust_management".to_string(),
2412                    peer_id,
2413                    details,
2414                    AuditSeverity::Info,
2415                )
2416                .await;
2417
2418            Ok(())
2419        } else {
2420            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2421                "Security not enabled".to_string().into(),
2422            )))
2423        }
2424    }
2425
2426    /// Check if peer is trusted
2427    pub async fn is_trusted_peer(&self, peer_id: &PeerId) -> bool {
2428        if let Some(security_manager) = &self.security_manager {
2429            security_manager.is_trusted_peer(peer_id).await
2430        } else {
2431            false
2432        }
2433    }
2434
2435    /// Set security policy for tool
2436    pub async fn set_tool_security_policy(
2437        &self,
2438        tool_name: String,
2439        level: SecurityLevel,
2440    ) -> Result<()> {
2441        if let Some(security_manager) = &self.security_manager {
2442            security_manager
2443                .set_tool_policy(tool_name.clone(), level.clone())
2444                .await?;
2445
2446            // Log policy change
2447            let mut details = HashMap::new();
2448            details.insert("action".to_string(), "tool_policy_set".to_string());
2449            details.insert("tool_name".to_string(), tool_name);
2450            details.insert("security_level".to_string(), format!("{level:?}"));
2451
2452            self.audit_logger
2453                .log_event(
2454                    "security_policy".to_string(),
2455                    "system".to_string(),
2456                    details,
2457                    AuditSeverity::Info,
2458                )
2459                .await;
2460
2461            Ok(())
2462        } else {
2463            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2464                "Security not enabled".to_string().into(),
2465            )))
2466        }
2467    }
2468
2469    /// Get security policy for tool
2470    pub async fn get_tool_security_policy(&self, tool_name: &str) -> SecurityLevel {
2471        if let Some(security_manager) = &self.security_manager {
2472            security_manager.get_tool_policy(tool_name).await
2473        } else {
2474            SecurityLevel::Public
2475        }
2476    }
2477
2478    /// Get peer security statistics
2479    pub async fn get_peer_security_stats(&self, peer_id: &PeerId) -> Option<PeerACL> {
2480        if let Some(security_manager) = &self.security_manager {
2481            security_manager.get_peer_stats(peer_id).await
2482        } else {
2483            None
2484        }
2485    }
2486
2487    /// Get recent security audit entries
2488    pub async fn get_security_audit(&self, limit: Option<usize>) -> Vec<SecurityAuditEntry> {
2489        self.audit_logger.get_recent_entries(limit).await
2490    }
2491
2492    /// Perform security housekeeping
2493    pub async fn security_cleanup(&self) -> Result<()> {
2494        if let Some(security_manager) = &self.security_manager {
2495            security_manager.cleanup().await?;
2496        }
2497        Ok(())
2498    }
2499
2500    /// Handle remote response
2501    async fn handle_remote_response(&self, message: P2PMCPMessage) -> Result<()> {
2502        // Find the waiting request handler
2503        let response_tx = {
2504            let mut handlers = self.request_handlers.write().await;
2505            handlers.remove(&message.message_id)
2506        };
2507
2508        if let Some(tx) = response_tx {
2509            let response = MCPResponse {
2510                request_id: message.message_id,
2511                message: message.payload,
2512                timestamp: SystemTime::now(),
2513                processing_time: Duration::from_millis(0), // TODO: Calculate actual processing time
2514            };
2515
2516            // Send response to waiting caller
2517            let _ = tx.send(response);
2518        } else {
2519            debug!(
2520                "Received response for unknown request: {}",
2521                message.message_id
2522            );
2523        }
2524
2525        Ok(())
2526    }
2527
2528    /// Announce local services to the network
2529    pub async fn announce_local_services(&self) -> Result<()> {
2530        if let Some(dht) = &self.dht {
2531            // Create local service announcement
2532            let local_service = self.create_local_service_announcement().await?;
2533
2534            // Store in DHT
2535            self.store_service_in_dht(&local_service, dht).await?;
2536
2537            // Broadcast service announcement to connected peers
2538            if let Some(network_sender) = &*self.network_sender.read().await {
2539                self.broadcast_service_announcement(&local_service, network_sender)
2540                    .await?;
2541            }
2542
2543            info!(
2544                "Announced local MCP service with {} tools",
2545                local_service.tools.len()
2546            );
2547        }
2548
2549        Ok(())
2550    }
2551
2552    /// Create a service announcement for our local node
2553    async fn create_local_service_announcement(&self) -> Result<MCPService> {
2554        let tools = self.tools.read().await;
2555        let tool_names: Vec<String> = tools.keys().cloned().collect();
2556
2557        let service = MCPService {
2558            service_id: format!("mcp-{}", self.config.server_name),
2559            node_id: "local".to_string(), // TODO: Get actual peer ID from network layer
2560            tools: tool_names,
2561            capabilities: MCPCapabilities {
2562                experimental: None,
2563                sampling: None,
2564                tools: Some(MCPToolsCapability {
2565                    list_changed: Some(true),
2566                }),
2567                prompts: None,
2568                resources: None,
2569                logging: None,
2570            },
2571            metadata: MCPServiceMetadata {
2572                name: self.config.server_name.clone(),
2573                version: self.config.server_version.clone(),
2574                description: Some("P2P MCP Service".to_string()),
2575                tags: vec!["p2p".to_string(), "mcp".to_string()],
2576                health_status: ServiceHealthStatus::Healthy,
2577                load_metrics: self.get_current_load_metrics().await,
2578            },
2579            registered_at: SystemTime::now(),
2580            endpoint: MCPEndpoint {
2581                protocol: "p2p".to_string(),
2582                address: "local".to_string(), // TODO: Get actual P2P address
2583                port: None,
2584                tls: true,
2585                auth_required: false,
2586            },
2587        };
2588
2589        Ok(service)
2590    }
2591
2592    /// Get current service load metrics
2593    async fn get_current_load_metrics(&self) -> ServiceLoadMetrics {
2594        let stats = self.stats.read().await;
2595
2596        ServiceLoadMetrics {
2597            active_requests: self.request_handlers.read().await.len() as u32,
2598            requests_per_second: stats.total_requests as f64 / 60.0, // Rough estimate
2599            avg_response_time_ms: stats.avg_response_time.as_millis() as f64,
2600            error_rate: if stats.total_requests > 0 {
2601                stats.total_errors as f64 / stats.total_requests as f64
2602            } else {
2603                0.0
2604            },
2605            cpu_usage: 0.0,  // TODO: Get actual CPU usage
2606            memory_usage: 0, // TODO: Get actual memory usage
2607        }
2608    }
2609
2610    /// Store service information in DHT
2611    async fn store_service_in_dht(
2612        &self,
2613        service: &MCPService,
2614        dht: &Arc<RwLock<DHT>>,
2615    ) -> Result<()> {
2616        // Store individual service record
2617        let service_key = Key::new(format!("mcp:service:{}", service.service_id).as_bytes());
2618        let service_data = serde_json::to_vec(service)
2619            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2620
2621        let dht_guard = dht.write().await;
2622        dht_guard
2623            .put(service_key.clone(), service_data)
2624            .await
2625            .map_err(|e| {
2626                P2PError::Dht(crate::error::DhtError::StoreFailed(
2627                    format!("{}: Failed to store service: {e}", service_key).into(),
2628                ))
2629            })?;
2630
2631        // Also add to services index
2632        let services_key = Key::new(b"mcp:services:index");
2633        let mut service_ids = match dht_guard.get(&services_key).await {
2634            Some(record) => {
2635                serde_json::from_slice::<Vec<String>>(&record.value).unwrap_or_default()
2636            }
2637            None => Vec::new(),
2638        };
2639
2640        if !service_ids.contains(&service.service_id) {
2641            service_ids.push(service.service_id.clone());
2642
2643            let index_data = serde_json::to_vec(&service_ids)
2644                .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2645
2646            dht_guard
2647                .put(services_key.clone(), index_data)
2648                .await
2649                .map_err(|e| {
2650                    P2PError::Dht(crate::error::DhtError::StoreFailed(
2651                        format!("{}: Failed to update services index: {e}", services_key).into(),
2652                    ))
2653                })?;
2654        }
2655
2656        Ok(())
2657    }
2658
2659    /// Broadcast service announcement to connected peers
2660    async fn broadcast_service_announcement(
2661        &self,
2662        service: &MCPService,
2663        network_sender: &Arc<dyn NetworkSender>,
2664    ) -> Result<()> {
2665        let announcement = P2PMCPMessage {
2666            message_type: P2PMCPMessageType::ServiceAdvertisement,
2667            message_id: uuid::Uuid::new_v4().to_string(),
2668            source_peer: network_sender.local_peer_id().clone(),
2669            target_peer: None, // Broadcast to all peers
2670            timestamp: SystemTime::now()
2671                .duration_since(std::time::UNIX_EPOCH)
2672                .unwrap_or_default()
2673                .as_secs(),
2674            payload: MCPMessage::ListToolsResult {
2675                tools: service
2676                    .tools
2677                    .iter()
2678                    .map(|tool_name| MCPTool {
2679                        name: tool_name.clone(),
2680                        description: format!("Tool from {}", service.metadata.name),
2681                        input_schema: json!({"type": "object"}),
2682                    })
2683                    .collect(),
2684                next_cursor: None,
2685            },
2686            ttl: 3,
2687        };
2688
2689        let _announcement_data = serde_json::to_vec(&announcement)
2690            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2691
2692        // TODO: Broadcast to all connected peers
2693        // For now, this would require getting the list of connected peers from the network layer
2694        debug!(
2695            "Service announcement prepared for broadcast: {} tools",
2696            service.tools.len()
2697        );
2698
2699        Ok(())
2700    }
2701
2702    /// Discover services from other peers
2703    pub async fn discover_remote_services(&self) -> Result<Vec<MCPService>> {
2704        if let Some(dht) = &self.dht {
2705            let services_key = Key::new(b"mcp:services:index");
2706            let dht_guard = dht.read().await;
2707
2708            let service_ids = match dht_guard.get(&services_key).await {
2709                Some(record) => {
2710                    serde_json::from_slice::<Vec<String>>(&record.value).unwrap_or_default()
2711                }
2712                None => {
2713                    debug!("No services index found in DHT");
2714                    return Ok(Vec::new());
2715                }
2716            };
2717
2718            let mut discovered_services = Vec::new();
2719
2720            for service_id in service_ids {
2721                let service_key = Key::new(format!("mcp:service:{service_id}").as_bytes());
2722
2723                if let Some(record) = dht_guard.get(&service_key).await {
2724                    match serde_json::from_slice::<MCPService>(&record.value) {
2725                        Ok(service) => {
2726                            // Don't include our own service
2727                            if service.service_id != format!("mcp-{}", self.config.server_name) {
2728                                discovered_services.push(service);
2729                            }
2730                        }
2731                        Err(e) => {
2732                            warn!("Failed to deserialize service {}: {}", service_id, e);
2733                        }
2734                    }
2735                }
2736            }
2737
2738            debug!(
2739                "Discovered {} remote MCP services",
2740                discovered_services.len()
2741            );
2742            Ok(discovered_services)
2743        } else {
2744            Ok(Vec::new())
2745        }
2746    }
2747
2748    /// Refresh service discovery and update remote services cache
2749    pub async fn refresh_service_discovery(&self) -> Result<()> {
2750        let discovered_services = self.discover_remote_services().await?;
2751
2752        // Update remote services cache
2753        {
2754            let mut remote_cache = self.remote_services.write().await;
2755            remote_cache.clear();
2756
2757            for service in discovered_services {
2758                remote_cache.insert(service.service_id.clone(), service);
2759            }
2760        }
2761
2762        // Also update local services registry
2763        {
2764            let local_service = self.create_local_service_announcement().await?;
2765            let mut local_cache = self.local_services.write().await;
2766            local_cache.insert(local_service.service_id.clone(), local_service);
2767        }
2768
2769        debug!("Service discovery refresh completed");
2770        Ok(())
2771    }
2772
2773    /// Get all known services (local + remote)
2774    pub async fn get_all_services(&self) -> Result<Vec<MCPService>> {
2775        let mut all_services = Vec::new();
2776
2777        // Add local services
2778        {
2779            let local_services = self.local_services.read().await;
2780            all_services.extend(local_services.values().cloned());
2781        }
2782
2783        // Add remote services
2784        {
2785            let remote_services = self.remote_services.read().await;
2786            all_services.extend(remote_services.values().cloned());
2787        }
2788
2789        Ok(all_services)
2790    }
2791
2792    /// Find services that provide a specific tool
2793    pub async fn find_services_with_tool(&self, tool_name: &str) -> Result<Vec<MCPService>> {
2794        let all_services = self.get_all_services().await?;
2795
2796        let matching_services = all_services
2797            .into_iter()
2798            .filter(|service| service.tools.contains(&tool_name.to_string()))
2799            .collect();
2800
2801        Ok(matching_services)
2802    }
2803
2804    /// Handle service advertisement
2805    pub async fn handle_service_advertisement(&self, message: P2PMCPMessage) -> Result<()> {
2806        debug!(
2807            "Received service advertisement from peer: {}",
2808            message.source_peer
2809        );
2810
2811        // Extract tools from the advertisement
2812        if let MCPMessage::ListToolsResult { tools, .. } = message.payload {
2813            // Create a service record from the advertisement
2814            let service = MCPService {
2815                service_id: format!("mcp-{}", message.source_peer),
2816                node_id: message.source_peer.clone(),
2817                tools: tools.iter().map(|t| t.name.clone()).collect(),
2818                capabilities: MCPCapabilities {
2819                    experimental: None,
2820                    sampling: None,
2821                    tools: Some(MCPToolsCapability {
2822                        list_changed: Some(true),
2823                    }),
2824                    prompts: None,
2825                    resources: None,
2826                    logging: None,
2827                },
2828                metadata: MCPServiceMetadata {
2829                    name: format!("Remote MCP Service - {}", message.source_peer),
2830                    version: "unknown".to_string(),
2831                    description: Some("Remote P2P MCP Service".to_string()),
2832                    tags: vec!["p2p".to_string(), "remote".to_string()],
2833                    health_status: ServiceHealthStatus::Healthy,
2834                    load_metrics: ServiceLoadMetrics {
2835                        active_requests: 0,
2836                        requests_per_second: 0.0,
2837                        avg_response_time_ms: 0.0,
2838                        error_rate: 0.0,
2839                        cpu_usage: 0.0,
2840                        memory_usage: 0,
2841                    },
2842                },
2843                registered_at: SystemTime::now(),
2844                endpoint: MCPEndpoint {
2845                    protocol: "p2p".to_string(),
2846                    address: message.source_peer.clone(),
2847                    port: None,
2848                    tls: true,
2849                    auth_required: false,
2850                },
2851            };
2852
2853            // Update remote services cache
2854            {
2855                let mut remote_services = self.remote_services.write().await;
2856                remote_services.insert(service.service_id.clone(), service.clone());
2857            }
2858
2859            // Store in DHT if available
2860            if let Some(dht) = &self.dht
2861                && let Err(e) = self.store_service_in_dht(&service, dht).await
2862            {
2863                warn!("Failed to store remote service in DHT: {}", e);
2864            }
2865
2866            info!(
2867                "Registered remote MCP service from {} with {} tools",
2868                message.source_peer,
2869                tools.len()
2870            );
2871        }
2872
2873        Ok(())
2874    }
2875
2876    /// Handle service discovery request
2877    pub async fn handle_service_discovery(
2878        &self,
2879        message: P2PMCPMessage,
2880    ) -> Result<Option<Vec<u8>>> {
2881        // Create service advertisement with our local services
2882        let local_services: Vec<MCPService> = {
2883            let services = self.local_services.read().await;
2884            services.values().cloned().collect()
2885        };
2886
2887        if !local_services.is_empty() {
2888            let advertisement = P2PMCPMessage {
2889                message_type: P2PMCPMessageType::ServiceAdvertisement,
2890                message_id: uuid::Uuid::new_v4().to_string(),
2891                source_peer: self.get_node_id_string(),
2892                target_peer: Some(message.source_peer),
2893                timestamp: SystemTime::now()
2894                    .duration_since(std::time::UNIX_EPOCH)
2895                    .map_err(|e| {
2896                        P2PError::Identity(crate::error::IdentityError::SystemTime(
2897                            format!("Time error: {e}").into(),
2898                        ))
2899                    })?
2900                    .as_secs(),
2901                payload: MCPMessage::ListToolsResult {
2902                    tools: local_services
2903                        .into_iter()
2904                        .flat_map(|s| {
2905                            s.tools.into_iter().map(|t| MCPTool {
2906                                name: t,
2907                                description: "Remote tool".to_string(),
2908                                input_schema: json!({"type": "object"}),
2909                            })
2910                        })
2911                        .collect(),
2912                    next_cursor: None,
2913                },
2914                ttl: message.ttl.saturating_sub(1),
2915            };
2916
2917            let response_data = serde_json::to_vec(&advertisement)
2918                .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2919
2920            Ok(Some(response_data))
2921        } else {
2922            Ok(None)
2923        }
2924    }
2925
2926    /// Shutdown the server
2927    pub async fn shutdown(&self) -> Result<()> {
2928        info!("Shutting down MCP server");
2929
2930        // Close all sessions
2931        {
2932            let mut sessions = self.sessions.write().await;
2933            for session in sessions.values_mut() {
2934                session.state = MCPSessionState::Terminated;
2935            }
2936            sessions.clear();
2937        }
2938
2939        // TODO: Cleanup tasks and channels
2940
2941        info!("MCP server shutdown complete");
2942        Ok(())
2943    }
2944}
2945
2946impl Tool {
2947    /// Create a tool builder
2948    pub fn builder(name: &str, description: &str, input_schema: Value) -> ToolBuilder {
2949        ToolBuilder {
2950            name: name.to_string(),
2951            description: description.to_string(),
2952            input_schema,
2953            handler: None,
2954            tags: Vec::new(),
2955        }
2956    }
2957}
2958
2959/// Builder for creating tools
2960pub struct ToolBuilder {
2961    name: String,
2962    description: String,
2963    input_schema: Value,
2964    handler: Option<Box<dyn ToolHandler + Send + Sync>>,
2965    tags: Vec<String>,
2966}
2967
2968impl ToolBuilder {
2969    /// Set tool handler
2970    pub fn handler<H: ToolHandler + Send + Sync + 'static>(mut self, handler: H) -> Self {
2971        self.handler = Some(Box::new(handler));
2972        self
2973    }
2974
2975    /// Add tags
2976    pub fn tags(mut self, tags: Vec<String>) -> Self {
2977        self.tags = tags;
2978        self
2979    }
2980
2981    /// Build the tool
2982    pub fn build(self) -> Result<Tool> {
2983        let handler = self.handler.ok_or_else(|| {
2984            P2PError::Mcp(crate::error::McpError::InvalidRequest(
2985                "Tool handler is required".to_string().into(),
2986            ))
2987        })?;
2988
2989        let definition = MCPTool {
2990            name: self.name,
2991            description: self.description,
2992            input_schema: self.input_schema,
2993        };
2994
2995        let metadata = ToolMetadata {
2996            created_at: SystemTime::now(),
2997            last_called: None,
2998            call_count: 0,
2999            avg_execution_time: Duration::from_millis(0),
3000            health_status: ToolHealthStatus::Healthy,
3001            tags: self.tags,
3002        };
3003
3004        Ok(Tool {
3005            definition,
3006            handler,
3007            metadata,
3008        })
3009    }
3010}
3011
3012/// Simple function-based tool handler
3013pub struct FunctionToolHandler<F> {
3014    function: F,
3015}
3016
3017impl<F, Fut> ToolHandler for FunctionToolHandler<F>
3018where
3019    F: Fn(Value) -> Fut + Send + Sync,
3020    Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
3021{
3022    fn execute(
3023        &self,
3024        arguments: Value,
3025    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>> {
3026        Box::pin((self.function)(arguments))
3027    }
3028}
3029
3030impl<F> FunctionToolHandler<F> {
3031    /// Create a new function-based tool handler
3032    pub fn new(function: F) -> Self {
3033        Self { function }
3034    }
3035}
3036
3037/// MCP service descriptor for discovery and routing
3038impl MCPService {
3039    /// Create a new MCP service descriptor
3040    pub fn new(service_id: String, node_id: PeerId) -> Self {
3041        Self {
3042            service_id,
3043            node_id,
3044            tools: Vec::new(),
3045            capabilities: MCPCapabilities {
3046                experimental: None,
3047                sampling: None,
3048                tools: Some(MCPToolsCapability {
3049                    list_changed: Some(true),
3050                }),
3051                prompts: None,
3052                resources: None,
3053                logging: None,
3054            },
3055            metadata: MCPServiceMetadata {
3056                name: "MCP Service".to_string(),
3057                version: "1.0.0".to_string(),
3058                description: None,
3059                tags: Vec::new(),
3060                health_status: ServiceHealthStatus::Healthy,
3061                load_metrics: ServiceLoadMetrics {
3062                    active_requests: 0,
3063                    requests_per_second: 0.0,
3064                    avg_response_time_ms: 0.0,
3065                    error_rate: 0.0,
3066                    cpu_usage: 0.0,
3067                    memory_usage: 0,
3068                },
3069            },
3070            registered_at: SystemTime::now(),
3071            endpoint: MCPEndpoint {
3072                protocol: "p2p".to_string(),
3073                address: "".to_string(),
3074                port: None,
3075                tls: false,
3076                auth_required: false,
3077            },
3078        }
3079    }
3080}
3081
3082impl Default for MCPCapabilities {
3083    fn default() -> Self {
3084        Self {
3085            experimental: None,
3086            sampling: None,
3087            tools: Some(MCPToolsCapability {
3088                list_changed: Some(true),
3089            }),
3090            prompts: Some(MCPPromptsCapability {
3091                list_changed: Some(true),
3092            }),
3093            resources: Some(MCPResourcesCapability {
3094                subscribe: Some(true),
3095                list_changed: Some(true),
3096            }),
3097            logging: Some(MCPLoggingCapability {
3098                levels: Some(vec![
3099                    MCPLogLevel::Debug,
3100                    MCPLogLevel::Info,
3101                    MCPLogLevel::Warning,
3102                    MCPLogLevel::Error,
3103                ]),
3104            }),
3105        }
3106    }
3107}
3108
3109#[cfg(test)]
3110mod tests {
3111    use super::*;
3112    use crate::dht::{DHT, DHTConfig, Key};
3113    use std::future::Future;
3114    use std::pin::Pin;
3115    use std::time::UNIX_EPOCH;
3116    use tokio::time::timeout;
3117
3118    /// Test implementation of ToolHandler for unit tests
3119    struct TestTool {
3120        name: String,
3121        should_error: bool,
3122        execution_time: Duration,
3123    }
3124
3125    impl TestTool {
3126        fn new(name: &str) -> Self {
3127            Self {
3128                name: name.to_string(),
3129                should_error: false,
3130                execution_time: Duration::from_millis(10),
3131            }
3132        }
3133
3134        fn with_error(mut self) -> Self {
3135            self.should_error = true;
3136            self
3137        }
3138
3139        fn with_execution_time(mut self, duration: Duration) -> Self {
3140            self.execution_time = duration;
3141            self
3142        }
3143    }
3144
3145    impl ToolHandler for TestTool {
3146        fn execute(
3147            &self,
3148            arguments: Value,
3149        ) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>> {
3150            let should_error = self.should_error;
3151            let execution_time = self.execution_time;
3152            let name = self.name.clone();
3153
3154            Box::pin(async move {
3155                tokio::time::sleep(execution_time).await;
3156
3157                if should_error {
3158                    return Err(P2PError::Mcp(crate::error::McpError::ToolExecutionFailed(
3159                        format!("{}: Test error", name).into(),
3160                    ))
3161                    .into());
3162                }
3163
3164                // Echo back the arguments with a response marker
3165                Ok(json!({
3166                    "tool": name,
3167                    "arguments": arguments,
3168                    "result": "success"
3169                }))
3170            })
3171        }
3172
3173        fn validate(&self, arguments: &Value) -> Result<()> {
3174            if !arguments.is_object() {
3175                return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
3176                    "Arguments must be an object".to_string().into(),
3177                ))
3178                .into());
3179            }
3180            Ok(())
3181        }
3182
3183        fn get_requirements(&self) -> ToolRequirements {
3184            ToolRequirements {
3185                max_memory: Some(1024 * 1024), // 1MB
3186                max_execution_time: Some(Duration::from_secs(5)),
3187                required_capabilities: vec!["test".to_string()],
3188                requires_network: false,
3189                requires_filesystem: false,
3190            }
3191        }
3192    }
3193
3194    /// Helper function to create a test MCP server
3195    async fn create_test_mcp_server() -> MCPServer {
3196        let config = MCPServerConfig {
3197            server_name: "test_server".to_string(),
3198            server_version: "1.0.0".to_string(),
3199            enable_auth: false,
3200            enable_rate_limiting: false,
3201            max_concurrent_requests: 10,
3202            request_timeout: Duration::from_secs(30),
3203            enable_dht_discovery: true,
3204            rate_limit_rpm: 60,
3205            enable_logging: true,
3206            max_tool_execution_time: Duration::from_secs(30),
3207            tool_memory_limit: 100 * 1024 * 1024,
3208            health_monitor: HealthMonitorConfig::default(),
3209        };
3210
3211        MCPServer::new(config)
3212    }
3213
3214    /// Helper function to create a test tool
3215    fn create_test_tool(name: &str) -> Tool {
3216        Tool {
3217            definition: MCPTool {
3218                name: name.to_string(),
3219                description: format!("Test tool: {}", name).into(),
3220                input_schema: json!({
3221                    "type": "object",
3222                    "properties": {
3223                        "input": { "type": "string" }
3224                    }
3225                }),
3226            },
3227            handler: Box::new(TestTool::new(name)),
3228            metadata: ToolMetadata {
3229                created_at: SystemTime::now(),
3230                last_called: None,
3231                call_count: 0,
3232                avg_execution_time: Duration::from_millis(0),
3233                health_status: ToolHealthStatus::Healthy,
3234                tags: vec!["test".to_string()],
3235            },
3236        }
3237    }
3238
3239    /// Helper function to create a test DHT
3240    async fn create_test_dht() -> DHT {
3241        let local_id = Key::new(b"test_node_id");
3242        let config = DHTConfig::default();
3243        DHT::new(local_id, config)
3244    }
3245
3246    /// Helper function to create an MCP call context
3247    fn create_test_context(caller_id: PeerId) -> MCPCallContext {
3248        MCPCallContext {
3249            caller_id,
3250            timestamp: SystemTime::now(),
3251            timeout: Duration::from_secs(30),
3252            auth_info: None,
3253            metadata: HashMap::new(),
3254        }
3255    }
3256
3257    #[tokio::test]
3258    async fn test_mcp_server_creation() {
3259        let server = create_test_mcp_server().await;
3260        assert_eq!(server.config.server_name, "test_server");
3261        assert_eq!(server.config.server_version, "1.0.0");
3262        assert!(!server.config.enable_auth);
3263        assert!(!server.config.enable_rate_limiting);
3264    }
3265
3266    #[tokio::test]
3267    async fn test_tool_registration() -> Result<()> {
3268        let server = create_test_mcp_server().await;
3269        let tool = create_test_tool("test_calculator");
3270
3271        // Register the tool
3272        server.register_tool(tool).await?;
3273
3274        // Verify tool is registered
3275        let tools = server.tools.read().await;
3276        assert!(tools.contains_key("test_calculator"));
3277        assert_eq!(
3278            tools
3279                .get("test_calculator")
3280                .expect("Should succeed in test")
3281                .definition
3282                .name,
3283            "test_calculator"
3284        );
3285
3286        // Verify stats updated
3287        let stats = server.stats.read().await;
3288        assert_eq!(stats.total_tools, 1);
3289
3290        Ok(())
3291    }
3292
3293    #[tokio::test]
3294    async fn test_tool_registration_duplicate() -> Result<()> {
3295        let server = create_test_mcp_server().await;
3296        let tool1 = create_test_tool("duplicate_tool");
3297        let tool2 = create_test_tool("duplicate_tool");
3298
3299        // Register first tool
3300        server.register_tool(tool1).await?;
3301
3302        // Try to register duplicate - should fail
3303        let result = server.register_tool(tool2).await;
3304        assert!(result.is_err());
3305        assert!(
3306            result
3307                .unwrap_err()
3308                .to_string()
3309                .contains("Tool already exists")
3310        );
3311
3312        Ok(())
3313    }
3314
3315    #[tokio::test]
3316    async fn test_tool_validation() {
3317        let server = create_test_mcp_server().await;
3318
3319        // Test invalid tool name (empty)
3320        let mut invalid_tool = create_test_tool("");
3321        let result = server.validate_tool(&invalid_tool).await;
3322        assert!(result.is_err());
3323
3324        // Test invalid tool name (too long)
3325        invalid_tool.definition.name = "a".repeat(200);
3326        let result = server.validate_tool(&invalid_tool).await;
3327        assert!(result.is_err());
3328
3329        // Test invalid schema (not an object)
3330        let mut invalid_schema_tool = create_test_tool("valid_name");
3331        invalid_schema_tool.definition.input_schema = json!("not an object");
3332        let result = server.validate_tool(&invalid_schema_tool).await;
3333        assert!(result.is_err());
3334
3335        // Test valid tool
3336        let valid_tool = create_test_tool("valid_tool");
3337        let result = server.validate_tool(&valid_tool).await;
3338        assert!(result.is_ok());
3339    }
3340
3341    #[tokio::test]
3342    async fn test_tool_call_success() -> Result<()> {
3343        let server = create_test_mcp_server().await;
3344        let tool = create_test_tool("success_tool");
3345        server.register_tool(tool).await?;
3346
3347        let caller_id = "test_peer_123".to_string();
3348        let context = create_test_context(caller_id);
3349        let arguments = json!({"input": "test data"});
3350
3351        let result = server
3352            .call_tool("success_tool", arguments.clone(), context)
3353            .await?;
3354
3355        // Verify response structure
3356        assert_eq!(result["tool"], "success_tool");
3357        assert_eq!(result["arguments"], arguments);
3358        assert_eq!(result["result"], "success");
3359
3360        // Verify tool metadata updated
3361        let tools = server.tools.read().await;
3362        let tool_metadata = &tools
3363            .get("success_tool")
3364            .ok_or_else(|| {
3365                P2PError::Mcp(crate::error::McpError::ToolNotFound(
3366                    "Tool not found".into(),
3367                ))
3368            })?
3369            .metadata;
3370        assert_eq!(tool_metadata.call_count, 1);
3371        assert!(tool_metadata.last_called.is_some());
3372
3373        Ok(())
3374    }
3375
3376    #[tokio::test]
3377    async fn test_tool_call_nonexistent() -> Result<()> {
3378        let server = create_test_mcp_server().await;
3379        let caller_id = "test_peer_456".to_string();
3380        let context = create_test_context(caller_id);
3381        let arguments = json!({"input": "test"});
3382
3383        let result = server
3384            .call_tool("nonexistent_tool", arguments, context)
3385            .await;
3386        assert!(result.is_err());
3387        assert!(result.unwrap_err().to_string().contains("Tool not found"));
3388
3389        Ok(())
3390    }
3391
3392    #[tokio::test]
3393    async fn test_tool_call_handler_error() -> Result<()> {
3394        let server = create_test_mcp_server().await;
3395        let tool = Tool {
3396            definition: MCPTool {
3397                name: "error_tool".to_string(),
3398                description: "Tool that always errors".to_string(),
3399                input_schema: json!({"type": "object"}),
3400            },
3401            handler: Box::new(TestTool::new("error_tool").with_error()),
3402            metadata: ToolMetadata {
3403                created_at: SystemTime::now(),
3404                last_called: None,
3405                call_count: 0,
3406                avg_execution_time: Duration::from_millis(0),
3407                health_status: ToolHealthStatus::Healthy,
3408                tags: vec![],
3409            },
3410        };
3411
3412        server.register_tool(tool).await?;
3413
3414        let caller_id = "test_peer_error".to_string();
3415        let context = create_test_context(caller_id);
3416        let arguments = json!({"input": "test"});
3417
3418        let result = server.call_tool("error_tool", arguments, context).await;
3419        assert!(result.is_err());
3420        assert!(
3421            result
3422                .unwrap_err()
3423                .to_string()
3424                .contains("Test error from tool error_tool")
3425        );
3426
3427        Ok(())
3428    }
3429
3430    #[tokio::test]
3431    async fn test_tool_call_timeout() -> Result<()> {
3432        let server = create_test_mcp_server().await;
3433        let slow_tool = Tool {
3434            definition: MCPTool {
3435                name: "slow_tool".to_string(),
3436                description: "Tool that takes too long".to_string(),
3437                input_schema: json!({"type": "object"}),
3438            },
3439            handler: Box::new(
3440                TestTool::new("slow_tool").with_execution_time(Duration::from_secs(2)),
3441            ),
3442            metadata: ToolMetadata {
3443                created_at: SystemTime::now(),
3444                last_called: None,
3445                call_count: 0,
3446                avg_execution_time: Duration::from_millis(0),
3447                health_status: ToolHealthStatus::Healthy,
3448                tags: vec![],
3449            },
3450        };
3451
3452        server.register_tool(slow_tool).await?;
3453
3454        let caller_id = "test_peer_error".to_string();
3455        let context = create_test_context(caller_id);
3456        let arguments = json!({"input": "test"});
3457
3458        // Test with very short timeout
3459        let result = timeout(
3460            Duration::from_millis(100),
3461            server.call_tool("slow_tool", arguments, context),
3462        )
3463        .await;
3464
3465        assert!(result.is_err()); // Should timeout
3466
3467        Ok(())
3468    }
3469
3470    #[tokio::test]
3471    async fn test_tool_requirements() {
3472        let tool = TestTool::new("req_tool");
3473        let requirements = tool.get_requirements();
3474
3475        assert_eq!(requirements.max_memory, Some(1024 * 1024));
3476        assert_eq!(
3477            requirements.max_execution_time,
3478            Some(Duration::from_secs(5))
3479        );
3480        assert_eq!(requirements.required_capabilities, vec!["test"]);
3481        assert!(!requirements.requires_network);
3482        assert!(!requirements.requires_filesystem);
3483    }
3484
3485    #[tokio::test]
3486    async fn test_tool_validation_handler() {
3487        let tool = TestTool::new("validation_tool");
3488
3489        // Valid arguments (object)
3490        let valid_args = json!({"key": "value"});
3491        assert!(tool.validate(&valid_args).is_ok());
3492
3493        // Invalid arguments (not an object)
3494        let invalid_args = json!("not an object");
3495        assert!(tool.validate(&invalid_args).is_err());
3496
3497        let invalid_args = json!(123);
3498        assert!(tool.validate(&invalid_args).is_err());
3499    }
3500
3501    #[tokio::test]
3502    async fn test_tool_health_status() {
3503        let mut metadata = ToolMetadata {
3504            created_at: SystemTime::now(),
3505            last_called: None,
3506            call_count: 0,
3507            avg_execution_time: Duration::from_millis(0),
3508            health_status: ToolHealthStatus::Healthy,
3509            tags: vec![],
3510        };
3511
3512        // Test different health statuses
3513        assert_eq!(metadata.health_status, ToolHealthStatus::Healthy);
3514
3515        metadata.health_status = ToolHealthStatus::Degraded;
3516        assert_eq!(metadata.health_status, ToolHealthStatus::Degraded);
3517
3518        metadata.health_status = ToolHealthStatus::Unhealthy;
3519        assert_eq!(metadata.health_status, ToolHealthStatus::Unhealthy);
3520
3521        metadata.health_status = ToolHealthStatus::Disabled;
3522        assert_eq!(metadata.health_status, ToolHealthStatus::Disabled);
3523    }
3524
3525    #[tokio::test]
3526    async fn test_mcp_capabilities() {
3527        let server = create_test_mcp_server().await;
3528        let capabilities = server.get_server_capabilities().await;
3529
3530        assert!(capabilities.tools.is_some());
3531        assert!(capabilities.prompts.is_some());
3532        assert!(capabilities.resources.is_some());
3533        assert!(capabilities.logging.is_some());
3534
3535        let tools_cap = capabilities.tools.expect("Test assertion failed");
3536        assert_eq!(tools_cap.list_changed, Some(true));
3537
3538        let logging_cap = capabilities.logging.expect("Test assertion failed");
3539        let levels = logging_cap.levels.expect("Test assertion failed");
3540        assert!(levels.contains(&MCPLogLevel::Debug));
3541        assert!(levels.contains(&MCPLogLevel::Info));
3542        assert!(levels.contains(&MCPLogLevel::Warning));
3543        assert!(levels.contains(&MCPLogLevel::Error));
3544    }
3545
3546    #[tokio::test]
3547    async fn test_mcp_message_serialization() {
3548        // Test Initialize message
3549        let init_msg = MCPMessage::Initialize {
3550            protocol_version: MCP_VERSION.to_string(),
3551            capabilities: MCPCapabilities {
3552                experimental: None,
3553                sampling: None,
3554                tools: Some(MCPToolsCapability {
3555                    list_changed: Some(true),
3556                }),
3557                prompts: None,
3558                resources: None,
3559                logging: None,
3560            },
3561            client_info: MCPClientInfo {
3562                name: "test_client".to_string(),
3563                version: "1.0.0".to_string(),
3564            },
3565        };
3566
3567        let serialized = serde_json::to_string(&init_msg).expect("Test assertion failed");
3568        let deserialized: MCPMessage =
3569            serde_json::from_str(&serialized).expect("Test assertion failed");
3570
3571        match deserialized {
3572            MCPMessage::Initialize {
3573                protocol_version,
3574                client_info,
3575                ..
3576            } => {
3577                assert_eq!(protocol_version, MCP_VERSION);
3578                assert_eq!(client_info.name, "test_client");
3579                assert_eq!(client_info.version, "1.0.0");
3580            }
3581            _ => panic!("Wrong message type after deserialization"),
3582        }
3583    }
3584
3585    #[tokio::test]
3586    async fn test_mcp_content_types() {
3587        // Test text content
3588        let text_content = MCPContent::Text {
3589            text: "Hello, world!".to_string(),
3590        };
3591
3592        let serialized = serde_json::to_string(&text_content).expect("Test assertion failed");
3593        let deserialized: MCPContent =
3594            serde_json::from_str(&serialized).expect("Test assertion failed");
3595
3596        match deserialized {
3597            MCPContent::Text { text } => assert_eq!(text, "Hello, world!"),
3598            _ => panic!("Wrong content type"),
3599        }
3600
3601        // Test image content
3602        let image_content = MCPContent::Image {
3603            data: "base64data".to_string(),
3604            mime_type: "image/png".to_string(),
3605        };
3606
3607        let serialized = serde_json::to_string(&image_content).expect("Test assertion failed");
3608        let deserialized: MCPContent =
3609            serde_json::from_str(&serialized).expect("Test assertion failed");
3610
3611        match deserialized {
3612            MCPContent::Image { data, mime_type } => {
3613                assert_eq!(data, "base64data");
3614                assert_eq!(mime_type, "image/png");
3615            }
3616            _ => panic!("Wrong content type"),
3617        }
3618    }
3619
3620    #[tokio::test]
3621    async fn test_service_health_status() {
3622        let mut metrics = ServiceLoadMetrics {
3623            active_requests: 0,
3624            requests_per_second: 0.0,
3625            avg_response_time_ms: 0.0,
3626            error_rate: 0.0,
3627            cpu_usage: 0.0,
3628            memory_usage: 0,
3629        };
3630
3631        // Test healthy service
3632        let metadata = MCPServiceMetadata {
3633            name: "test_service".to_string(),
3634            version: "1.0.0".to_string(),
3635            description: Some("Test service".to_string()),
3636            tags: vec!["test".to_string()],
3637            health_status: ServiceHealthStatus::Healthy,
3638            load_metrics: metrics.clone(),
3639        };
3640
3641        assert_eq!(metadata.health_status, ServiceHealthStatus::Healthy);
3642
3643        // Test different health statuses
3644        metrics.error_rate = 0.5; // 50% error rate
3645        let degraded_metadata = MCPServiceMetadata {
3646            health_status: ServiceHealthStatus::Degraded,
3647            load_metrics: metrics.clone(),
3648            ..metadata.clone()
3649        };
3650
3651        assert_eq!(
3652            degraded_metadata.health_status,
3653            ServiceHealthStatus::Degraded
3654        );
3655
3656        let unhealthy_metadata = MCPServiceMetadata {
3657            health_status: ServiceHealthStatus::Unhealthy,
3658            ..metadata.clone()
3659        };
3660
3661        assert_eq!(
3662            unhealthy_metadata.health_status,
3663            ServiceHealthStatus::Unhealthy
3664        );
3665    }
3666
3667    #[tokio::test]
3668    async fn test_p2p_mcp_message() {
3669        let source_peer = "source_peer_123".to_string();
3670        let target_peer = "target_peer_456".to_string();
3671
3672        let p2p_message = P2PMCPMessage {
3673            message_type: P2PMCPMessageType::Request,
3674            message_id: uuid::Uuid::new_v4().to_string(),
3675            source_peer: source_peer.clone(),
3676            target_peer: Some(target_peer.clone()),
3677            timestamp: SystemTime::now()
3678                .duration_since(UNIX_EPOCH)
3679                .unwrap_or_else(|_| Duration::from_secs(0))
3680                .as_secs(),
3681            payload: MCPMessage::ListTools { cursor: None },
3682            ttl: 10,
3683        };
3684
3685        // Test serialization
3686        let serialized = serde_json::to_string(&p2p_message).expect("Test assertion failed");
3687        let deserialized: P2PMCPMessage =
3688            serde_json::from_str(&serialized).expect("Test assertion failed");
3689
3690        assert_eq!(deserialized.message_type, P2PMCPMessageType::Request);
3691        assert_eq!(deserialized.source_peer, source_peer);
3692        assert_eq!(deserialized.target_peer, Some(target_peer));
3693        assert_eq!(deserialized.ttl, 10);
3694
3695        match deserialized.payload {
3696            MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
3697            _ => panic!("Wrong message payload type"),
3698        }
3699    }
3700
3701    #[tokio::test]
3702    async fn test_tool_requirements_default() {
3703        let default_requirements = ToolRequirements::default();
3704
3705        assert_eq!(default_requirements.max_memory, Some(100 * 1024 * 1024));
3706        assert_eq!(
3707            default_requirements.max_execution_time,
3708            Some(Duration::from_secs(30))
3709        );
3710        assert!(default_requirements.required_capabilities.is_empty());
3711        assert!(!default_requirements.requires_network);
3712        assert!(!default_requirements.requires_filesystem);
3713    }
3714
3715    #[tokio::test]
3716    async fn test_mcp_server_stats() -> Result<()> {
3717        let server = create_test_mcp_server().await;
3718
3719        // Initial stats should be zero
3720        let stats = server.stats.read().await;
3721        assert_eq!(stats.total_tools, 0);
3722        assert_eq!(stats.total_requests, 0);
3723        assert_eq!(stats.total_responses, 0);
3724        assert_eq!(stats.total_errors, 0);
3725
3726        drop(stats);
3727
3728        // Register a tool and verify stats update
3729        let tool = create_test_tool("stats_test_tool");
3730        server.register_tool(tool).await?;
3731
3732        let stats = server.stats.read().await;
3733        assert_eq!(stats.total_tools, 1);
3734
3735        Ok(())
3736    }
3737
3738    #[tokio::test]
3739    async fn test_log_levels() {
3740        // Test all log levels serialize/deserialize correctly
3741        let levels = vec![
3742            MCPLogLevel::Debug,
3743            MCPLogLevel::Info,
3744            MCPLogLevel::Notice,
3745            MCPLogLevel::Warning,
3746            MCPLogLevel::Error,
3747            MCPLogLevel::Critical,
3748            MCPLogLevel::Alert,
3749            MCPLogLevel::Emergency,
3750        ];
3751
3752        for level in levels {
3753            let serialized = serde_json::to_string(&level).expect("Test assertion failed");
3754            let deserialized: MCPLogLevel =
3755                serde_json::from_str(&serialized).expect("Test assertion failed");
3756            assert_eq!(level as u8, deserialized as u8);
3757        }
3758    }
3759
3760    #[tokio::test]
3761    async fn test_mcp_endpoint() {
3762        let endpoint = MCPEndpoint {
3763            protocol: "p2p".to_string(),
3764            address: "127.0.0.1".to_string(),
3765            port: Some(9000),
3766            tls: true,
3767            auth_required: true,
3768        };
3769
3770        let serialized = serde_json::to_string(&endpoint).expect("Test assertion failed");
3771        let deserialized: MCPEndpoint =
3772            serde_json::from_str(&serialized).expect("Test assertion failed");
3773
3774        assert_eq!(deserialized.protocol, "p2p");
3775        assert_eq!(deserialized.address, "127.0.0.1");
3776        assert_eq!(deserialized.port, Some(9000));
3777        assert!(deserialized.tls);
3778        assert!(deserialized.auth_required);
3779    }
3780
3781    #[tokio::test]
3782    async fn test_mcp_service_metadata() {
3783        let load_metrics = ServiceLoadMetrics {
3784            active_requests: 5,
3785            requests_per_second: 10.5,
3786            avg_response_time_ms: 250.0,
3787            error_rate: 0.01,
3788            cpu_usage: 45.5,
3789            memory_usage: 1024 * 1024 * 100, // 100MB
3790        };
3791
3792        let metadata = MCPServiceMetadata {
3793            name: "test_service".to_string(),
3794            version: "2.1.0".to_string(),
3795            description: Some("A test service for unit testing".to_string()),
3796            tags: vec!["test".to_string(), "unit".to_string(), "mcp".to_string()],
3797            health_status: ServiceHealthStatus::Healthy,
3798            load_metrics,
3799        };
3800
3801        // Test serialization
3802        let serialized = serde_json::to_string(&metadata).expect("Test assertion failed");
3803        let deserialized: MCPServiceMetadata =
3804            serde_json::from_str(&serialized).expect("Test assertion failed");
3805
3806        assert_eq!(deserialized.name, "test_service");
3807        assert_eq!(deserialized.version, "2.1.0");
3808        assert_eq!(
3809            deserialized.description,
3810            Some("A test service for unit testing".to_string())
3811        );
3812        assert_eq!(deserialized.tags, vec!["test", "unit", "mcp"]);
3813        assert_eq!(deserialized.health_status, ServiceHealthStatus::Healthy);
3814        assert_eq!(deserialized.load_metrics.active_requests, 5);
3815        assert_eq!(deserialized.load_metrics.requests_per_second, 10.5);
3816    }
3817
3818    #[tokio::test]
3819    async fn test_function_tool_handler() -> Result<()> {
3820        // Test function tool handler creation and execution
3821        let handler = FunctionToolHandler::new(|args: Value| async move {
3822            let name = args.get("name").and_then(|v| v.as_str()).unwrap_or("world");
3823            Ok(json!({"greeting": format!("Hello, {}!", name)}))
3824        });
3825
3826        let args = json!({"name": "Alice"});
3827        let result = handler.execute(args).await?;
3828        assert_eq!(result["greeting"], "Hello, Alice!");
3829
3830        // Test with missing argument
3831        let empty_args = json!({});
3832        let result = handler.execute(empty_args).await?;
3833        assert_eq!(result["greeting"], "Hello, world!");
3834
3835        Ok(())
3836    }
3837
3838    #[tokio::test]
3839    async fn test_mcp_service_creation() {
3840        let service_id = "test_service_123".to_string();
3841        let node_id = "test_node_789".to_string();
3842
3843        let service = MCPService::new(service_id.clone(), node_id.clone());
3844
3845        assert_eq!(service.service_id, service_id);
3846        assert_eq!(service.node_id, node_id);
3847        assert!(service.tools.is_empty());
3848        assert_eq!(service.metadata.name, "MCP Service");
3849        assert_eq!(service.metadata.version, "1.0.0");
3850        assert_eq!(service.metadata.health_status, ServiceHealthStatus::Healthy);
3851        assert_eq!(service.endpoint.protocol, "p2p");
3852        assert!(!service.endpoint.tls);
3853        assert!(!service.endpoint.auth_required);
3854    }
3855
3856    #[tokio::test]
3857    async fn test_mcp_capabilities_default() {
3858        let capabilities = MCPCapabilities::default();
3859
3860        assert!(capabilities.tools.is_some());
3861        assert!(capabilities.prompts.is_some());
3862        assert!(capabilities.resources.is_some());
3863        assert!(capabilities.logging.is_some());
3864
3865        let tools_cap = capabilities.tools.expect("Test assertion failed");
3866        assert_eq!(tools_cap.list_changed, Some(true));
3867
3868        let resources_cap = capabilities.resources.expect("Test assertion failed");
3869        assert_eq!(resources_cap.subscribe, Some(true));
3870        assert_eq!(resources_cap.list_changed, Some(true));
3871
3872        let logging_cap = capabilities.logging.expect("Test assertion failed");
3873        let levels = logging_cap.levels.expect("Test assertion failed");
3874        assert!(levels.contains(&MCPLogLevel::Debug));
3875        assert!(levels.contains(&MCPLogLevel::Info));
3876        assert!(levels.contains(&MCPLogLevel::Warning));
3877        assert!(levels.contains(&MCPLogLevel::Error));
3878    }
3879
3880    #[tokio::test]
3881    async fn test_mcp_request_creation() {
3882        let source_peer = "source_peer_123".to_string();
3883        let target_peer = "target_peer_456".to_string();
3884
3885        let request = MCPRequest {
3886            request_id: uuid::Uuid::new_v4().to_string(),
3887            source_peer: source_peer.clone(),
3888            target_peer: target_peer.clone(),
3889            message: MCPMessage::ListTools { cursor: None },
3890            timestamp: SystemTime::now(),
3891            timeout: Duration::from_secs(30),
3892            auth_token: Some("test_token".to_string()),
3893        };
3894
3895        assert_eq!(request.source_peer, source_peer);
3896        assert_eq!(request.target_peer, target_peer);
3897        assert_eq!(request.timeout, Duration::from_secs(30));
3898        assert_eq!(request.auth_token, Some("test_token".to_string()));
3899
3900        match request.message {
3901            MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
3902            _ => panic!("Wrong message type"),
3903        }
3904    }
3905
3906    #[tokio::test]
3907    async fn test_p2p_message_types() {
3908        // Test all P2P message types
3909        assert_eq!(P2PMCPMessageType::Request, P2PMCPMessageType::Request);
3910        assert_eq!(P2PMCPMessageType::Response, P2PMCPMessageType::Response);
3911        assert_eq!(
3912            P2PMCPMessageType::ServiceAdvertisement,
3913            P2PMCPMessageType::ServiceAdvertisement
3914        );
3915        assert_eq!(
3916            P2PMCPMessageType::ServiceDiscovery,
3917            P2PMCPMessageType::ServiceDiscovery
3918        );
3919
3920        // Test serialization of each type
3921        for msg_type in [
3922            P2PMCPMessageType::Request,
3923            P2PMCPMessageType::Response,
3924            P2PMCPMessageType::ServiceAdvertisement,
3925            P2PMCPMessageType::ServiceDiscovery,
3926        ] {
3927            let serialized = serde_json::to_string(&msg_type).expect("Test assertion failed");
3928            let deserialized: P2PMCPMessageType =
3929                serde_json::from_str(&serialized).expect("Test assertion failed");
3930            assert_eq!(msg_type, deserialized);
3931        }
3932    }
3933}