saorsa_core/
mcp.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Model Context Protocol (MCP) Server Implementation
15//!
16//! This module provides a fully-featured MCP server that integrates with the P2P network,
17//! enabling AI agents to discover and call tools across the distributed network.
18//!
19//! The implementation includes:
20//! - MCP message routing over P2P transport
21//! - Tool registration and discovery through DHT
22//! - Security and authentication for remote calls
23//! - Service health monitoring and load balancing
24
25#![allow(missing_docs)]
26
27pub mod security;
28
29use crate::dht::{DHT, DhtKey, Key};
30use crate::{P2PError, PeerId, Result};
31use rand;
32use serde::{Deserialize, Serialize};
33use serde_json::{Value, json};
34use std::collections::HashMap;
35use std::sync::Arc;
36use std::time::{Duration, Instant, SystemTime};
37use tokio::sync::{RwLock, mpsc, oneshot};
38use tokio::time::timeout;
39use tracing::{debug, info, warn};
40
41pub use security::*;
42
43/// MCP protocol version
44pub const MCP_VERSION: &str = "2024-11-05";
45
46/// Maximum message size for MCP calls (1MB)
47pub const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
48
49/// Default timeout for MCP calls
50pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30);
51
52/// MCP protocol identifier for P2P messaging
53pub const MCP_PROTOCOL: &str = "/p2p-foundation/mcp/1.0.0";
54
55/// Network message sender trait for MCP server
56#[async_trait::async_trait]
57pub trait NetworkSender: Send + Sync {
58    /// Send a message to a specific peer via the P2P network
59    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
60
61    /// Get our local peer ID
62    fn local_peer_id(&self) -> &PeerId;
63}
64
65/// Message sender function type for simplified network integration
66pub type MessageSender = Arc<dyn Fn(&PeerId, &str, Vec<u8>) -> Result<()> + Send + Sync>;
67
68/// Service discovery refresh interval
69pub const SERVICE_DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
70
71/// MCP message types
72#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(tag = "type", rename_all = "snake_case")]
74pub enum MCPMessage {
75    /// Initialize MCP session
76    Initialize {
77        /// MCP protocol version being used
78        protocol_version: String,
79        /// Client capabilities for this session
80        capabilities: MCPCapabilities,
81        /// Information about the connecting client
82        client_info: MCPClientInfo,
83    },
84    /// Initialize response
85    InitializeResult {
86        /// MCP protocol version the server supports
87        protocol_version: String,
88        /// Server capabilities for this session
89        capabilities: MCPCapabilities,
90        /// Information about the MCP server
91        server_info: MCPServerInfo,
92    },
93    /// List available tools
94    ListTools {
95        /// Pagination cursor for large tool lists
96        cursor: Option<String>,
97    },
98    /// List tools response
99    ListToolsResult {
100        /// Available tools on this server
101        tools: Vec<MCPTool>,
102        /// Next pagination cursor if more tools available
103        next_cursor: Option<String>,
104    },
105    /// Call a tool
106    CallTool {
107        /// Name of the tool to call
108        name: String,
109        /// Arguments to pass to the tool
110        arguments: Value,
111    },
112    /// Tool call response
113    CallToolResult {
114        /// Content returned by the tool
115        content: Vec<MCPContent>,
116        /// Whether the call resulted in an error
117        is_error: bool,
118    },
119    /// List available prompts
120    ListPrompts {
121        /// Pagination cursor for large prompt lists
122        cursor: Option<String>,
123    },
124    /// List prompts response
125    ListPromptsResult {
126        /// Available prompts on this server
127        prompts: Vec<MCPPrompt>,
128        /// Next pagination cursor if more prompts available
129        next_cursor: Option<String>,
130    },
131    /// Get a prompt
132    GetPrompt {
133        /// Name of the prompt to retrieve
134        name: String,
135        /// Arguments to customize the prompt
136        arguments: Option<Value>,
137    },
138    /// Get prompt response
139    GetPromptResult {
140        /// Description of the prompt
141        description: Option<String>,
142        /// Prompt messages/content
143        messages: Vec<MCPPromptMessage>,
144    },
145    /// List available resources
146    ListResources {
147        /// Pagination cursor for large resource lists
148        cursor: Option<String>,
149    },
150    /// List resources response
151    ListResourcesResult {
152        /// Available resources on this server
153        resources: Vec<MCPResource>,
154        /// Next pagination cursor if more resources available
155        next_cursor: Option<String>,
156    },
157    /// Read a resource
158    ReadResource {
159        /// URI of the resource to read
160        uri: String,
161    },
162    /// Read resource response
163    ReadResourceResult {
164        /// Contents of the requested resource
165        contents: Vec<MCPResourceContent>,
166    },
167    /// Subscribe to resource
168    SubscribeResource {
169        /// URI of the resource to subscribe to
170        uri: String,
171    },
172    /// Unsubscribe from resource
173    UnsubscribeResource {
174        /// URI of the resource to unsubscribe from
175        uri: String,
176    },
177    /// Resource updated notification
178    ResourceUpdated {
179        /// URI of the resource that was updated
180        uri: String,
181    },
182    /// List logs
183    ListLogs {
184        /// Pagination cursor for large log lists
185        cursor: Option<String>,
186    },
187    /// List logs response
188    ListLogsResult {
189        /// Log entries available on this server
190        logs: Vec<MCPLogEntry>,
191        /// Next pagination cursor if more logs available
192        next_cursor: Option<String>,
193    },
194    /// Set log level
195    SetLogLevel {
196        /// Log level to set for the server
197        level: MCPLogLevel,
198    },
199    /// Error response
200    Error {
201        /// Error code identifying the type of error
202        code: i32,
203        /// Human-readable error message
204        message: String,
205        /// Optional additional error data
206        data: Option<Value>,
207    },
208}
209
210/// MCP capabilities
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct MCPCapabilities {
213    /// Experimental capabilities
214    pub experimental: Option<Value>,
215    /// Sampling capability
216    pub sampling: Option<Value>,
217    /// Tools capability
218    pub tools: Option<MCPToolsCapability>,
219    /// Prompts capability
220    pub prompts: Option<MCPPromptsCapability>,
221    /// Resources capability
222    pub resources: Option<MCPResourcesCapability>,
223    /// Logging capability
224    pub logging: Option<MCPLoggingCapability>,
225}
226
227/// Tools capability configuration
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct MCPToolsCapability {
230    /// Whether tools are supported
231    pub list_changed: Option<bool>,
232}
233
234/// Prompts capability configuration
235#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct MCPPromptsCapability {
237    /// Whether prompts are supported
238    pub list_changed: Option<bool>,
239}
240
241/// Resources capability configuration
242#[derive(Debug, Clone, Serialize, Deserialize)]
243pub struct MCPResourcesCapability {
244    /// Whether resources are supported
245    pub subscribe: Option<bool>,
246    /// Whether resource listing is supported
247    pub list_changed: Option<bool>,
248}
249
250/// Logging capability configuration
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct MCPLoggingCapability {
253    /// Available log levels
254    pub levels: Option<Vec<MCPLogLevel>>,
255}
256
257/// MCP client information
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct MCPClientInfo {
260    /// Client name
261    pub name: String,
262    /// Client version
263    pub version: String,
264}
265
266/// MCP server information
267#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct MCPServerInfo {
269    /// Server name
270    pub name: String,
271    /// Server version
272    pub version: String,
273}
274
275/// MCP tool definition
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct MCPTool {
278    /// Tool name
279    pub name: String,
280    /// Tool description
281    pub description: String,
282    /// Input schema (JSON Schema)
283    pub input_schema: Value,
284}
285
286/// MCP tool implementation
287pub struct Tool {
288    /// Tool definition
289    pub definition: MCPTool,
290    /// Tool handler function
291    pub handler: Box<dyn ToolHandler + Send + Sync>,
292    /// Tool metadata
293    pub metadata: ToolMetadata,
294}
295
296/// Tool metadata for tracking and optimization
297#[derive(Debug, Clone)]
298pub struct ToolMetadata {
299    /// Tool creation time
300    pub created_at: SystemTime,
301    /// Last call time
302    pub last_called: Option<SystemTime>,
303    /// Total number of calls
304    pub call_count: u64,
305    /// Average execution time
306    pub avg_execution_time: Duration,
307    /// Tool health status
308    pub health_status: ToolHealthStatus,
309    /// Tags for categorization
310    pub tags: Vec<String>,
311}
312
313/// Tool health status
314#[derive(Debug, Clone, Copy, PartialEq)]
315pub enum ToolHealthStatus {
316    /// Tool is healthy and responsive
317    Healthy,
318    /// Tool is experiencing issues
319    Degraded,
320    /// Tool is not responding
321    Unhealthy,
322    /// Tool is disabled
323    Disabled,
324}
325
326/// Health monitoring configuration
327#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct HealthMonitorConfig {
329    /// Interval between health checks
330    pub health_check_interval: Duration,
331    /// Timeout for health check requests
332    pub health_check_timeout: Duration,
333    /// Number of consecutive failures before marking unhealthy
334    pub failure_threshold: u32,
335    /// Number of consecutive successes to mark healthy again
336    pub success_threshold: u32,
337    /// Interval for sending heartbeats
338    pub heartbeat_interval: Duration,
339    /// Maximum age of last heartbeat before considering service stale
340    pub heartbeat_timeout: Duration,
341    /// Whether to enable health monitoring
342    pub enabled: bool,
343}
344
345impl Default for HealthMonitorConfig {
346    fn default() -> Self {
347        Self {
348            health_check_interval: Duration::from_secs(30),
349            health_check_timeout: Duration::from_secs(5),
350            failure_threshold: 3,
351            success_threshold: 2,
352            heartbeat_interval: Duration::from_secs(60),
353            heartbeat_timeout: Duration::from_secs(300), // 5 minutes
354            enabled: true,
355        }
356    }
357}
358
359/// Service health information
360#[derive(Debug, Clone, Serialize, Deserialize)]
361pub struct ServiceHealth {
362    /// Service identifier
363    pub service_id: String,
364    /// Current health status
365    pub status: ServiceHealthStatus,
366    /// Last successful health check
367    pub last_health_check: Option<SystemTime>,
368    /// Last heartbeat received
369    pub last_heartbeat: Option<SystemTime>,
370    /// Consecutive failure count
371    pub failure_count: u32,
372    /// Consecutive success count
373    pub success_count: u32,
374    /// Average response time for health checks
375    pub avg_response_time: Duration,
376    /// Error message if unhealthy
377    pub error_message: Option<String>,
378    /// Health check history (last 10 checks)
379    pub health_history: Vec<HealthCheckResult>,
380}
381
382/// Result of a health check
383#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct HealthCheckResult {
385    /// Timestamp of the check
386    pub timestamp: SystemTime,
387    /// Whether the check was successful
388    pub success: bool,
389    /// Response time
390    pub response_time: Duration,
391    /// Error message if failed
392    pub error_message: Option<String>,
393}
394
395/// Heartbeat message
396#[derive(Debug, Clone, Serialize, Deserialize)]
397pub struct Heartbeat {
398    /// Service ID sending the heartbeat
399    pub service_id: String,
400    /// Peer ID of the service
401    pub peer_id: PeerId,
402    /// Timestamp when heartbeat was sent
403    pub timestamp: SystemTime,
404    /// Current service load (0.0 to 1.0)
405    pub load: f32,
406    /// Available tools
407    pub available_tools: Vec<String>,
408    /// Service capabilities
409    pub capabilities: MCPCapabilities,
410}
411
412/// Health monitoring events
413#[derive(Debug, Clone)]
414pub enum HealthEvent {
415    /// Service became healthy
416    ServiceHealthy { service_id: String, peer_id: PeerId },
417    /// Service became unhealthy
418    ServiceUnhealthy {
419        service_id: String,
420        peer_id: PeerId,
421        error: String,
422    },
423    /// Service status changed to degraded
424    ServiceDegraded {
425        service_id: String,
426        peer_id: PeerId,
427        reason: String,
428    },
429    /// Heartbeat received
430    HeartbeatReceived {
431        service_id: String,
432        peer_id: PeerId,
433        load: f32,
434    },
435    /// Heartbeat timeout
436    HeartbeatTimeout { service_id: String, peer_id: PeerId },
437}
438
439/// Tool handler trait
440pub trait ToolHandler {
441    /// Execute the tool with given arguments
442    fn execute(
443        &self,
444        arguments: Value,
445    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>>;
446
447    /// Validate tool arguments
448    fn validate(&self, arguments: &Value) -> Result<()> {
449        // Default implementation - no validation
450        let _ = arguments;
451        Ok(())
452    }
453
454    /// Get tool resource requirements
455    fn get_requirements(&self) -> ToolRequirements {
456        ToolRequirements::default()
457    }
458}
459
460/// Tool resource requirements
461#[derive(Debug, Clone)]
462pub struct ToolRequirements {
463    /// Maximum memory usage in bytes
464    pub max_memory: Option<u64>,
465    /// Maximum execution time allowed for tool calls
466    pub max_execution_time: Option<Duration>,
467    /// Required capabilities that must be available
468    pub required_capabilities: Vec<String>,
469    /// Whether this tool requires network access
470    pub requires_network: bool,
471    /// Whether this tool requires file system access
472    pub requires_filesystem: bool,
473}
474
475impl Default for ToolRequirements {
476    fn default() -> Self {
477        Self {
478            max_memory: Some(100 * 1024 * 1024), // 100MB default
479            max_execution_time: Some(Duration::from_secs(30)),
480            required_capabilities: Vec::new(),
481            requires_network: false,
482            requires_filesystem: false,
483        }
484    }
485}
486
487/// MCP content types
488#[derive(Debug, Clone, Serialize, Deserialize)]
489#[serde(tag = "type", rename_all = "snake_case")]
490pub enum MCPContent {
491    /// Text content
492    Text {
493        /// The text content
494        text: String,
495    },
496    /// Image content
497    Image {
498        /// Base64-encoded image data
499        data: String,
500        /// MIME type of the image
501        mime_type: String,
502    },
503    /// Resource content
504    Resource {
505        /// Reference to an MCP resource
506        resource: MCPResourceReference,
507    },
508}
509
510/// MCP resource reference
511#[derive(Debug, Clone, Serialize, Deserialize)]
512pub struct MCPResourceReference {
513    /// Resource URI
514    pub uri: String,
515    /// Resource type
516    pub type_: Option<String>,
517}
518
519/// MCP prompt definition
520#[derive(Debug, Clone, Serialize, Deserialize)]
521pub struct MCPPrompt {
522    /// Prompt name
523    pub name: String,
524    /// Prompt description
525    pub description: Option<String>,
526    /// Prompt arguments schema
527    pub arguments: Option<Value>,
528}
529
530/// MCP prompt message
531#[derive(Debug, Clone, Serialize, Deserialize)]
532pub struct MCPPromptMessage {
533    /// Message role
534    pub role: MCPRole,
535    /// Message content
536    pub content: MCPContent,
537}
538
539/// MCP message roles
540#[derive(Debug, Clone, Serialize, Deserialize)]
541#[serde(rename_all = "snake_case")]
542pub enum MCPRole {
543    /// User message
544    User,
545    /// Assistant message
546    Assistant,
547    /// System message
548    System,
549}
550
551/// MCP resource definition
552#[derive(Debug, Clone, Serialize, Deserialize)]
553pub struct MCPResource {
554    /// Resource URI
555    pub uri: String,
556    /// Resource name
557    pub name: String,
558    /// Resource description
559    pub description: Option<String>,
560    /// Resource MIME type
561    pub mime_type: Option<String>,
562}
563
564/// MCP resource content
565#[derive(Debug, Clone, Serialize, Deserialize)]
566pub struct MCPResourceContent {
567    /// Content URI
568    pub uri: String,
569    /// Content MIME type
570    pub mime_type: String,
571    /// Content data
572    pub text: Option<String>,
573    /// Binary content (base64 encoded)
574    pub blob: Option<String>,
575}
576
577/// MCP log levels
578#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
579#[serde(rename_all = "snake_case")]
580pub enum MCPLogLevel {
581    /// Debug level
582    Debug,
583    /// Info level
584    Info,
585    /// Notice level
586    Notice,
587    /// Warning level
588    Warning,
589    /// Error level
590    Error,
591    /// Critical level
592    Critical,
593    /// Alert level
594    Alert,
595    /// Emergency level
596    Emergency,
597}
598
599/// MCP log entry
600#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct MCPLogEntry {
602    /// Log level
603    pub level: MCPLogLevel,
604    /// Log message
605    pub data: Value,
606    /// Logger name
607    pub logger: Option<String>,
608}
609
610/// MCP service descriptor for discovery
611#[derive(Debug, Clone, Serialize, Deserialize)]
612pub struct MCPService {
613    /// Service identifier
614    pub service_id: String,
615    /// Node providing the service
616    pub node_id: PeerId,
617    /// Available tools
618    pub tools: Vec<String>,
619    /// Service capabilities
620    pub capabilities: MCPCapabilities,
621    /// Service metadata
622    pub metadata: MCPServiceMetadata,
623    /// Service registration time
624    pub registered_at: SystemTime,
625    /// Service endpoint information
626    pub endpoint: MCPEndpoint,
627}
628
629/// MCP service metadata
630#[derive(Debug, Clone, Serialize, Deserialize)]
631pub struct MCPServiceMetadata {
632    /// Service name
633    pub name: String,
634    /// Service version
635    pub version: String,
636    /// Service description
637    pub description: Option<String>,
638    /// Service tags
639    pub tags: Vec<String>,
640    /// Service health status
641    pub health_status: ServiceHealthStatus,
642    /// Service load metrics
643    pub load_metrics: ServiceLoadMetrics,
644}
645
646/// Service health status
647#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
648pub enum ServiceHealthStatus {
649    /// Service is healthy and responsive
650    Healthy,
651    /// Service is experiencing degraded performance
652    Degraded,
653    /// Service is not responding to health checks
654    Unhealthy,
655    /// Service is disabled/maintenance mode
656    Disabled,
657    /// Service status is unknown
658    Unknown,
659}
660
661/// Service load metrics
662#[derive(Debug, Clone, Serialize, Deserialize)]
663pub struct ServiceLoadMetrics {
664    /// Current active requests
665    pub active_requests: u32,
666    /// Requests per second
667    pub requests_per_second: f64,
668    /// Average response time in milliseconds
669    pub avg_response_time_ms: f64,
670    /// Error rate (0.0-1.0)
671    pub error_rate: f64,
672    /// CPU usage percentage
673    pub cpu_usage: f64,
674    /// Memory usage in bytes
675    pub memory_usage: u64,
676}
677
678/// MCP endpoint information
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct MCPEndpoint {
681    /// Endpoint protocol (p2p, http, etc.)
682    pub protocol: String,
683    /// Endpoint address
684    pub address: String,
685    /// Endpoint port
686    pub port: Option<u16>,
687    /// TLS enabled
688    pub tls: bool,
689    /// Authentication required
690    pub auth_required: bool,
691}
692
693/// MCP request with routing information
694#[derive(Debug, Clone)]
695pub struct MCPRequest {
696    /// Request ID
697    pub request_id: String,
698    /// Source peer
699    pub source_peer: PeerId,
700    /// Target peer
701    pub target_peer: PeerId,
702    /// MCP message
703    pub message: MCPMessage,
704    /// Request timestamp
705    pub timestamp: SystemTime,
706    /// Request timeout
707    pub timeout: Duration,
708    /// Authentication token
709    pub auth_token: Option<String>,
710}
711
712/// P2P MCP message wrapper for network transmission
713#[derive(Debug, Clone, Serialize, Deserialize)]
714pub struct P2PMCPMessage {
715    /// Message type
716    pub message_type: P2PMCPMessageType,
717    /// Request/Response ID for correlation
718    pub message_id: String,
719    /// Source peer ID
720    pub source_peer: PeerId,
721    /// Target peer ID (optional for broadcasts)
722    pub target_peer: Option<PeerId>,
723    /// Timestamp
724    pub timestamp: u64,
725    /// MCP message payload
726    pub payload: MCPMessage,
727    /// Message TTL for routing
728    pub ttl: u8,
729}
730
731/// P2P MCP message types
732#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
733pub enum P2PMCPMessageType {
734    /// Direct request to a specific peer
735    Request,
736    /// Response to a request
737    Response,
738    /// Service advertisement
739    ServiceAdvertisement,
740    /// Service discovery query
741    ServiceDiscovery,
742    /// Heartbeat notification
743    Heartbeat,
744    /// Health check request
745    HealthCheck,
746}
747
748/// MCP response with metadata
749#[derive(Debug, Clone)]
750pub struct MCPResponse {
751    /// Request ID this response corresponds to
752    pub request_id: String,
753    /// Response message
754    pub message: MCPMessage,
755    /// Response timestamp
756    pub timestamp: SystemTime,
757    /// Processing time
758    pub processing_time: Duration,
759}
760
761/// MCP call context
762#[derive(Debug, Clone)]
763pub struct MCPCallContext {
764    /// Caller peer ID
765    pub caller_id: PeerId,
766    /// Call timestamp
767    pub timestamp: SystemTime,
768    /// Call timeout
769    pub timeout: Duration,
770    /// Authentication information
771    pub auth_info: Option<MCPAuthInfo>,
772    /// Call metadata
773    pub metadata: HashMap<String, String>,
774}
775
776/// MCP authentication information
777#[derive(Debug, Clone)]
778pub struct MCPAuthInfo {
779    /// Authentication token
780    pub token: String,
781    /// Token type
782    pub token_type: String,
783    /// Token expiration
784    pub expires_at: Option<SystemTime>,
785    /// Granted permissions
786    pub permissions: Vec<String>,
787}
788
789/// MCP server configuration
790#[derive(Debug, Clone, Serialize, Deserialize)]
791pub struct MCPServerConfig {
792    /// Server name
793    pub server_name: String,
794    /// Server version
795    pub server_version: String,
796    /// Enable tool discovery via DHT
797    pub enable_dht_discovery: bool,
798    /// Maximum concurrent requests
799    pub max_concurrent_requests: usize,
800    /// Request timeout
801    pub request_timeout: Duration,
802    /// Enable authentication
803    pub enable_auth: bool,
804    /// Enable rate limiting
805    pub enable_rate_limiting: bool,
806    /// Rate limit: requests per minute
807    pub rate_limit_rpm: u32,
808    /// Enable request logging
809    pub enable_logging: bool,
810    /// Maximum tool execution time
811    pub max_tool_execution_time: Duration,
812    /// Tool memory limit
813    pub tool_memory_limit: u64,
814    /// Health monitoring configuration
815    pub health_monitor: HealthMonitorConfig,
816}
817
818impl Default for MCPServerConfig {
819    fn default() -> Self {
820        Self {
821            server_name: "P2P-MCP-Server".to_string(),
822            server_version: crate::VERSION.to_string(),
823            enable_dht_discovery: true,
824            max_concurrent_requests: 100,
825            request_timeout: DEFAULT_CALL_TIMEOUT,
826            enable_auth: true,
827            enable_rate_limiting: true,
828            rate_limit_rpm: 60,
829            enable_logging: true,
830            max_tool_execution_time: Duration::from_secs(30),
831            tool_memory_limit: 100 * 1024 * 1024, // 100MB
832            health_monitor: HealthMonitorConfig::default(),
833        }
834    }
835}
836
837/// Main MCP server implementation
838pub struct MCPServer {
839    /// Server configuration
840    config: MCPServerConfig,
841    /// Registered tools
842    tools: Arc<RwLock<HashMap<String, Tool>>>,
843    /// Registered prompts (reserved for future implementation)
844    #[allow(dead_code)]
845    prompts: Arc<RwLock<HashMap<String, MCPPrompt>>>,
846    /// Registered resources (reserved for future implementation)
847    #[allow(dead_code)]
848    resources: Arc<RwLock<HashMap<String, MCPResource>>>,
849    /// Active sessions
850    sessions: Arc<RwLock<HashMap<String, MCPSession>>>,
851    /// Request handlers
852    request_handlers: Arc<RwLock<HashMap<String, oneshot::Sender<MCPResponse>>>>,
853    /// DHT reference for service discovery
854    dht: Option<Arc<RwLock<DHT>>>,
855    /// Local service registry
856    local_services: Arc<RwLock<HashMap<String, MCPService>>>,
857    /// Remote service cache
858    remote_services: Arc<RwLock<HashMap<String, MCPService>>>,
859    /// Request statistics
860    stats: Arc<RwLock<MCPServerStats>>,
861    /// Message channel for incoming requests
862    request_tx: mpsc::UnboundedSender<MCPRequest>,
863    /// Message channel for outgoing responses (reserved for future implementation)
864    #[allow(dead_code)]
865    response_rx: Arc<RwLock<mpsc::UnboundedReceiver<MCPResponse>>>,
866    /// Security manager
867    security_manager: Option<Arc<MCPSecurityManager>>,
868    /// Audit logger
869    audit_logger: Arc<SecurityAuditLogger>,
870    /// Network sender for P2P communication
871    network_sender: Arc<RwLock<Option<Arc<dyn NetworkSender>>>>,
872    /// Service health tracking
873    service_health: Arc<RwLock<HashMap<String, ServiceHealth>>>,
874    /// Health event sender
875    health_event_tx: mpsc::UnboundedSender<HealthEvent>,
876    /// Health event receiver (for monitoring)
877    #[allow(dead_code)]
878    health_event_rx: Arc<RwLock<mpsc::UnboundedReceiver<HealthEvent>>>,
879    /// Node ID for this MCP server
880    node_id: Option<PeerId>,
881}
882
883/// MCP session information
884#[derive(Debug, Clone)]
885pub struct MCPSession {
886    /// Session ID
887    pub session_id: String,
888    /// Peer ID
889    pub peer_id: PeerId,
890    /// Client capabilities
891    pub client_capabilities: Option<MCPCapabilities>,
892    /// Session start time
893    pub started_at: SystemTime,
894    /// Last activity time
895    pub last_activity: SystemTime,
896    /// Session state
897    pub state: MCPSessionState,
898    /// Subscribed resources
899    pub subscribed_resources: Vec<String>,
900}
901
902/// MCP session state
903#[derive(Debug, Clone, PartialEq)]
904pub enum MCPSessionState {
905    /// Session is initializing
906    Initializing,
907    /// Session is active
908    Active,
909    /// Session is inactive
910    Inactive,
911    /// Session is terminated
912    Terminated,
913}
914
915/// MCP server statistics
916#[derive(Debug, Clone)]
917pub struct MCPServerStats {
918    /// Total requests processed
919    pub total_requests: u64,
920    /// Total responses sent
921    pub total_responses: u64,
922    /// Total errors
923    pub total_errors: u64,
924    /// Average response time
925    pub avg_response_time: Duration,
926    /// Active sessions
927    pub active_sessions: u32,
928    /// Total tools registered
929    pub total_tools: u32,
930    /// Most called tools
931    pub popular_tools: HashMap<String, u64>,
932    /// Server start time
933    pub server_started_at: SystemTime,
934}
935
936impl Default for MCPServerStats {
937    fn default() -> Self {
938        Self {
939            total_requests: 0,
940            total_responses: 0,
941            total_errors: 0,
942            avg_response_time: Duration::from_millis(0),
943            active_sessions: 0,
944            total_tools: 0,
945            popular_tools: HashMap::new(),
946            server_started_at: SystemTime::now(),
947        }
948    }
949}
950
951impl MCPServer {
952    /// Create a new MCP server
953    pub fn new(config: MCPServerConfig) -> Self {
954        let (request_tx, _request_rx) = mpsc::unbounded_channel();
955        let (_response_tx, response_rx) = mpsc::unbounded_channel();
956        let (health_event_tx, health_event_rx) = mpsc::unbounded_channel();
957
958        // Initialize security manager if authentication is enabled
959        let security_manager = if config.enable_auth {
960            // Generate a random secret key for token signing
961            let secret_key = (0..32).map(|_| rand::random::<u8>()).collect();
962            Some(Arc::new(MCPSecurityManager::new(
963                secret_key,
964                config.rate_limit_rpm,
965            )))
966        } else {
967            None
968        };
969
970        Self {
971            config,
972            tools: Arc::new(RwLock::new(HashMap::new())),
973            prompts: Arc::new(RwLock::new(HashMap::new())),
974            resources: Arc::new(RwLock::new(HashMap::new())),
975            sessions: Arc::new(RwLock::new(HashMap::new())),
976            request_handlers: Arc::new(RwLock::new(HashMap::new())),
977            dht: None,
978            local_services: Arc::new(RwLock::new(HashMap::new())),
979            remote_services: Arc::new(RwLock::new(HashMap::new())),
980            stats: Arc::new(RwLock::new(MCPServerStats::default())),
981            request_tx,
982            response_rx: Arc::new(RwLock::new(response_rx)),
983            security_manager,
984            audit_logger: Arc::new(SecurityAuditLogger::new(10000)), // Keep 10k audit entries
985            network_sender: Arc::new(RwLock::new(None)),
986            service_health: Arc::new(RwLock::new(HashMap::new())),
987            health_event_tx,
988            health_event_rx: Arc::new(RwLock::new(health_event_rx)),
989            node_id: None,
990        }
991    }
992
993    /// Create MCP server with DHT integration
994    pub fn with_dht(mut self, dht: Arc<RwLock<DHT>>) -> Self {
995        self.dht = Some(dht);
996        self
997    }
998
999    /// Set the network sender for P2P communication
1000    pub async fn with_network_sender(mut self, sender: Arc<dyn NetworkSender>) -> Self {
1001        let peer_id = sender.local_peer_id().clone();
1002        self.node_id = Some(peer_id);
1003        *self.network_sender.write().await = Some(sender);
1004        self
1005    }
1006
1007    /// Set the network sender for P2P communication (async method for post-creation setup)
1008    pub async fn set_network_sender(&self, sender: Arc<dyn NetworkSender>) {
1009        let peer_id = sender.local_peer_id().clone();
1010        *self.network_sender.write().await = Some(sender);
1011        info!("MCP server network sender configured for peer {}", peer_id);
1012    }
1013
1014    /// Get the node ID as a string, returns "uninitialized" if node_id is None
1015    fn get_node_id_string(&self) -> String {
1016        self.node_id
1017            .as_ref()
1018            .map(|id| id.to_string())
1019            .unwrap_or_else(|| "uninitialized".to_string())
1020    }
1021
1022    /// Start the MCP server
1023    pub async fn start(&self) -> Result<()> {
1024        info!("Starting MCP server: {}", self.config.server_name);
1025
1026        // Start request processing task
1027        self.start_request_processor().await?;
1028
1029        // Start service discovery if DHT is available
1030        if self.dht.is_some() {
1031            self.start_service_discovery().await?;
1032        }
1033
1034        // Start health monitoring
1035        self.start_health_monitor().await?;
1036
1037        info!("MCP server started successfully");
1038        Ok(())
1039    }
1040
1041    /// Register a tool
1042    pub async fn register_tool(&self, tool: Tool) -> Result<()> {
1043        let tool_name = tool.definition.name.clone();
1044
1045        // Validate tool
1046        self.validate_tool(&tool).await?;
1047
1048        // Register locally
1049        {
1050            let mut tools = self.tools.write().await;
1051            tools.insert(tool_name.clone(), tool);
1052        }
1053
1054        // Update statistics
1055        {
1056            let mut stats = self.stats.write().await;
1057            stats.total_tools += 1;
1058        }
1059
1060        // Register in DHT if available
1061        if let Some(dht) = &self.dht {
1062            self.register_tool_in_dht(&tool_name, dht).await?;
1063        }
1064
1065        // Announce updated service with new tool
1066        if let Err(e) = self.announce_local_services().await {
1067            warn!("Failed to announce service after tool registration: {}", e);
1068        }
1069
1070        info!("Registered tool: {}", tool_name);
1071        Ok(())
1072    }
1073
1074    /// Validate tool before registration
1075    async fn validate_tool(&self, tool: &Tool) -> Result<()> {
1076        // Check for duplicate names
1077        let tools = self.tools.read().await;
1078        if tools.contains_key(&tool.definition.name) {
1079            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1080                format!("Tool already exists: {}", tool.definition.name).into(),
1081            )));
1082        }
1083
1084        // Validate tool name
1085        if tool.definition.name.is_empty() || tool.definition.name.len() > 100 {
1086            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1087                "Invalid tool name".to_string().into(),
1088            )));
1089        }
1090
1091        // Validate schema
1092        if !tool.definition.input_schema.is_object() {
1093            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1094                "Tool input schema must be an object".to_string().into(),
1095            )));
1096        }
1097
1098        Ok(())
1099    }
1100
1101    /// Register tool in DHT for discovery
1102    async fn register_tool_in_dht(&self, tool_name: &str, dht: &Arc<RwLock<DHT>>) -> Result<()> {
1103        let hash = blake3::hash(format!("mcp:tool:{tool_name}").as_bytes());
1104        let key: Key = *hash.as_bytes();
1105        let service_info = json!({
1106            "tool_name": tool_name,
1107            "node_id": self.get_node_id_string(),
1108            "registered_at": SystemTime::now().duration_since(std::time::UNIX_EPOCH).map_err(|e| P2PError::Identity(crate::error::IdentityError::SystemTime(format!("Time error: {e}").into())))?.as_secs(),
1109            "capabilities": self.get_server_capabilities().await
1110        });
1111
1112        let mut dht_guard = dht.write().await;
1113        dht_guard
1114            .store(&DhtKey::from_bytes(key), serde_json::to_vec(&service_info)?)
1115            .await
1116            .map_err(|e| {
1117                P2PError::Dht(crate::error::DhtError::StoreFailed(
1118                    format!("Failed to register service: {e}").into(),
1119                ))
1120            })?;
1121
1122        Ok(())
1123    }
1124
1125    /// Get server capabilities
1126    async fn get_server_capabilities(&self) -> MCPCapabilities {
1127        MCPCapabilities {
1128            experimental: None,
1129            sampling: None,
1130            tools: Some(MCPToolsCapability {
1131                list_changed: Some(true),
1132            }),
1133            prompts: Some(MCPPromptsCapability {
1134                list_changed: Some(true),
1135            }),
1136            resources: Some(MCPResourcesCapability {
1137                subscribe: Some(true),
1138                list_changed: Some(true),
1139            }),
1140            logging: Some(MCPLoggingCapability {
1141                levels: Some(vec![
1142                    MCPLogLevel::Debug,
1143                    MCPLogLevel::Info,
1144                    MCPLogLevel::Warning,
1145                    MCPLogLevel::Error,
1146                ]),
1147            }),
1148        }
1149    }
1150
1151    /// Call a tool by name
1152    pub async fn call_tool(
1153        &self,
1154        tool_name: &str,
1155        arguments: Value,
1156        context: MCPCallContext,
1157    ) -> Result<Value> {
1158        let start_time = Instant::now();
1159
1160        // Security checks
1161
1162        // 1. Check rate limiting
1163        if !self.check_rate_limit(&context.caller_id).await? {
1164            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1165                "Rate limit exceeded".to_string().into(),
1166            )));
1167        }
1168
1169        // 2. Check tool execution permission
1170        if !self
1171            .check_permission(&context.caller_id, &MCPPermission::ExecuteTools)
1172            .await?
1173        {
1174            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1175                "Permission denied: execute tools".to_string().into(),
1176            )));
1177        }
1178
1179        // 3. Check tool-specific security policy
1180        let tool_security_level = self.get_tool_security_policy(tool_name).await;
1181        let is_trusted = self.is_trusted_peer(&context.caller_id).await;
1182
1183        match tool_security_level {
1184            SecurityLevel::Admin => {
1185                if !self
1186                    .check_permission(&context.caller_id, &MCPPermission::Admin)
1187                    .await?
1188                {
1189                    return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1190                        "Permission denied: admin access required"
1191                            .to_string()
1192                            .into(),
1193                    )));
1194                }
1195            }
1196            SecurityLevel::Strong => {
1197                if !is_trusted {
1198                    return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1199                        "Permission denied: trusted peer required"
1200                            .to_string()
1201                            .into(),
1202                    )));
1203                }
1204            }
1205            SecurityLevel::Basic => {
1206                // Check if authentication is enabled and token is valid
1207                if self.config.enable_auth {
1208                    if let Some(auth_info) = &context.auth_info {
1209                        self.verify_auth_token(&auth_info.token).await?;
1210                    } else {
1211                        return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1212                            "Authentication required".to_string().into(),
1213                        )));
1214                    }
1215                }
1216            }
1217            SecurityLevel::Public => {
1218                // No additional checks needed
1219            }
1220        }
1221
1222        // Log the tool call attempt
1223        let mut details = HashMap::new();
1224        details.insert("action".to_string(), "tool_call".to_string());
1225        details.insert("tool_name".to_string(), tool_name.to_string());
1226        details.insert(
1227            "security_level".to_string(),
1228            format!("{tool_security_level:?}"),
1229        );
1230
1231        self.audit_logger
1232            .log_event(
1233                "tool_execution".to_string(),
1234                context.caller_id.clone(),
1235                details,
1236                AuditSeverity::Info,
1237            )
1238            .await;
1239
1240        // Check if tool exists
1241        let tool_exists = {
1242            let tools = self.tools.read().await;
1243            tools.contains_key(tool_name)
1244        };
1245
1246        if !tool_exists {
1247            return Err(P2PError::Mcp(crate::error::McpError::ToolNotFound(
1248                tool_name.to_string().into(),
1249            )));
1250        }
1251
1252        // Validate arguments and get requirements
1253        let requirements = {
1254            let tools = self.tools.read().await;
1255            let tool = tools.get(tool_name).ok_or_else(|| {
1256                P2PError::Mcp(crate::error::McpError::ToolNotFound(
1257                    tool_name.to_string().into(),
1258                ))
1259            })?;
1260
1261            // Validate arguments
1262            if let Err(e) = tool.handler.validate(&arguments) {
1263                return Err(P2PError::Mcp(crate::error::McpError::ExecutionFailed(
1264                    format!("{}: Validation failed: {e}", tool_name).into(),
1265                )));
1266            }
1267
1268            // Get resource requirements
1269            tool.handler.get_requirements()
1270        };
1271
1272        // Check resource requirements
1273        self.check_resource_requirements(&requirements).await?;
1274
1275        // Execute tool in a spawned task to avoid borrow checker issues
1276        let tools_clone = self.tools.clone();
1277        let tool_name_owned = tool_name.to_string();
1278        let execution_timeout = context
1279            .timeout
1280            .min(requirements.max_execution_time.unwrap_or(context.timeout));
1281
1282        let result = timeout(execution_timeout, async move {
1283            let tools = tools_clone.read().await;
1284            let tool = tools.get(&tool_name_owned).ok_or_else(|| {
1285                P2PError::Mcp(crate::error::McpError::ToolNotFound(
1286                    tool_name_owned.clone().into(),
1287                ))
1288            })?;
1289            tool.handler.execute(arguments).await
1290        })
1291        .await
1292        .map_err(|_| {
1293            P2PError::Mcp(crate::error::McpError::ExecutionFailed(
1294                format!("{}: Tool execution timeout", tool_name).into(),
1295            ))
1296        })?
1297        .map_err(|e| {
1298            P2PError::Mcp(crate::error::McpError::ExecutionFailed(
1299                format!("{}: {e}", tool_name).into(),
1300            ))
1301        })?;
1302
1303        let execution_time = start_time.elapsed();
1304
1305        // Update tool statistics
1306        self.update_tool_stats(tool_name, execution_time, true)
1307            .await;
1308
1309        // Update server statistics
1310        {
1311            let mut stats = self.stats.write().await;
1312            stats.total_requests += 1;
1313            stats.total_responses += 1;
1314
1315            // Update average response time
1316            let new_total_time = stats
1317                .avg_response_time
1318                .mul_f64(stats.total_responses as f64 - 1.0)
1319                + execution_time;
1320            stats.avg_response_time = new_total_time.div_f64(stats.total_responses as f64);
1321
1322            // Update popular tools
1323            *stats
1324                .popular_tools
1325                .entry(tool_name.to_string())
1326                .or_insert(0) += 1;
1327        }
1328
1329        debug!("Tool '{}' executed in {:?}", tool_name, execution_time);
1330        Ok(result)
1331    }
1332
1333    /// Check if resource requirements can be met
1334    async fn check_resource_requirements(&self, requirements: &ToolRequirements) -> Result<()> {
1335        // Check memory limit
1336        if let Some(max_memory) = requirements.max_memory
1337            && max_memory > self.config.tool_memory_limit
1338        {
1339            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1340                "Tool memory requirement exceeds limit".to_string().into(),
1341            )));
1342        }
1343
1344        // Check execution time limit
1345        if let Some(max_execution_time) = requirements.max_execution_time
1346            && max_execution_time > self.config.max_tool_execution_time
1347        {
1348            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1349                "Tool execution time requirement exceeds limit"
1350                    .to_string()
1351                    .into(),
1352            )));
1353        }
1354
1355        // TODO: Check other requirements (capabilities, network, filesystem)
1356
1357        Ok(())
1358    }
1359
1360    /// Update tool execution statistics
1361    async fn update_tool_stats(&self, tool_name: &str, execution_time: Duration, success: bool) {
1362        let mut tools = self.tools.write().await;
1363        if let Some(tool) = tools.get_mut(tool_name) {
1364            tool.metadata.call_count += 1;
1365            tool.metadata.last_called = Some(SystemTime::now());
1366
1367            // Update average execution time
1368            let new_total_time = tool
1369                .metadata
1370                .avg_execution_time
1371                .mul_f64(tool.metadata.call_count as f64 - 1.0)
1372                + execution_time;
1373            tool.metadata.avg_execution_time =
1374                new_total_time.div_f64(tool.metadata.call_count as f64);
1375
1376            // Update health status based on success
1377            if !success {
1378                tool.metadata.health_status = match tool.metadata.health_status {
1379                    ToolHealthStatus::Healthy => ToolHealthStatus::Degraded,
1380                    ToolHealthStatus::Degraded => ToolHealthStatus::Unhealthy,
1381                    other => other,
1382                };
1383            } else if tool.metadata.health_status != ToolHealthStatus::Disabled {
1384                tool.metadata.health_status = ToolHealthStatus::Healthy;
1385            }
1386        }
1387    }
1388
1389    /// List available tools
1390    pub async fn list_tools(
1391        &self,
1392        _cursor: Option<String>,
1393    ) -> Result<(Vec<MCPTool>, Option<String>)> {
1394        let tools = self.tools.read().await;
1395        let tool_definitions: Vec<MCPTool> =
1396            tools.values().map(|tool| tool.definition.clone()).collect();
1397
1398        // For simplicity, return all tools without pagination
1399        // In a real implementation, you'd implement proper cursor-based pagination
1400        Ok((tool_definitions, None))
1401    }
1402
1403    /// Start request processing task
1404    async fn start_request_processor(&self) -> Result<()> {
1405        let _request_tx = self.request_tx.clone();
1406        let _server_clone = Arc::new(self);
1407
1408        tokio::spawn(async move {
1409            info!("MCP request processor started");
1410
1411            // In a real implementation, this would listen for incoming MCP requests
1412            // and process them through a receiver channel. For now, we'll implement
1413            // the message handling infrastructure without the actual network loop.
1414
1415            // Placeholder single sleep to simulate periodic work without an infinite or never loop
1416            tokio::time::sleep(Duration::from_millis(100)).await;
1417
1418            info!("MCP request processor stopped");
1419        });
1420
1421        Ok(())
1422    }
1423
1424    /// Start service discovery task
1425    async fn start_service_discovery(&self) -> Result<()> {
1426        if let Some(dht) = self.dht.clone() {
1427            let _stats = self.stats.clone();
1428            let remote_services = self.remote_services.clone();
1429
1430            tokio::spawn(async move {
1431                info!("MCP service discovery started");
1432
1433                loop {
1434                    // Periodically discover services
1435                    tokio::time::sleep(SERVICE_DISCOVERY_INTERVAL).await;
1436
1437                    // Query DHT for MCP services
1438                    let hash = blake3::hash(b"mcp:services");
1439                    let key: Key = *hash.as_bytes();
1440                    let dht_guard = dht.read().await;
1441
1442                    match dht_guard.retrieve(&DhtKey::from_bytes(key)).await {
1443                        Ok(Some(value)) => {
1444                            match serde_json::from_slice::<Vec<MCPService>>(&value) {
1445                                Ok(services) => {
1446                                    debug!("Discovered {} MCP services", services.len());
1447
1448                                    // Update remote services cache
1449                                    {
1450                                        let mut remote_cache = remote_services.write().await;
1451                                        for service in services {
1452                                            remote_cache
1453                                                .insert(service.service_id.clone(), service);
1454                                        }
1455                                    }
1456                                }
1457                                Err(e) => {
1458                                    debug!("Failed to deserialize services: {}", e);
1459                                }
1460                            }
1461                        }
1462                        Ok(None) | Err(_) => {
1463                            debug!("No MCP services found in DHT");
1464                        }
1465                    }
1466                }
1467            });
1468        }
1469
1470        Ok(())
1471    }
1472
1473    /// Start health monitoring task
1474    async fn start_health_monitor(&self) -> Result<()> {
1475        if !self.config.health_monitor.enabled {
1476            debug!("Health monitoring is disabled");
1477            return Ok(());
1478        }
1479
1480        info!(
1481            "Starting health monitoring with interval: {:?}",
1482            self.config.health_monitor.health_check_interval
1483        );
1484
1485        // Clone necessary fields for the background task
1486        let service_health = Arc::clone(&self.service_health);
1487        let remote_services = Arc::clone(&self.remote_services);
1488        let network_sender = Arc::clone(&self.network_sender);
1489        let health_event_tx = self.health_event_tx.clone();
1490        let config = self.config.health_monitor.clone();
1491
1492        // Start health check task
1493        let health_check_task = {
1494            let service_health = Arc::clone(&service_health);
1495            let remote_services = Arc::clone(&remote_services);
1496            let network_sender = Arc::clone(&network_sender);
1497            let health_event_tx = health_event_tx.clone();
1498            let config = config.clone();
1499
1500            tokio::spawn(async move {
1501                let mut interval = tokio::time::interval(config.health_check_interval);
1502
1503                loop {
1504                    interval.tick().await;
1505
1506                    // Get list of remote services to check
1507                    let services_to_check: Vec<MCPService> = {
1508                        let remote_guard = remote_services.read().await;
1509                        remote_guard.values().cloned().collect()
1510                    };
1511
1512                    // Perform health checks on all remote services
1513                    for service in services_to_check {
1514                        if let Some(sender) = network_sender.read().await.as_ref() {
1515                            Self::perform_health_check(
1516                                &service,
1517                                sender.as_ref(),
1518                                &service_health,
1519                                &health_event_tx,
1520                                &config,
1521                            )
1522                            .await;
1523                        }
1524                    }
1525                }
1526            })
1527        };
1528
1529        // Start heartbeat task
1530        let heartbeat_task = {
1531            let network_sender = Arc::clone(&network_sender);
1532            let health_event_tx = health_event_tx.clone();
1533            let config = config.clone();
1534
1535            tokio::spawn(async move {
1536                let mut interval = tokio::time::interval(config.heartbeat_interval);
1537
1538                loop {
1539                    interval.tick().await;
1540
1541                    if let Some(sender) = network_sender.read().await.as_ref() {
1542                        Self::send_heartbeat(sender.as_ref(), &health_event_tx).await;
1543                    }
1544                }
1545            })
1546        };
1547
1548        // Start heartbeat timeout monitoring task
1549        let timeout_task = {
1550            let service_health = Arc::clone(&service_health);
1551            let health_event_tx = health_event_tx.clone();
1552            let config = config.clone();
1553
1554            tokio::spawn(async move {
1555                let mut interval = tokio::time::interval(Duration::from_secs(30)); // Check every 30 seconds
1556
1557                loop {
1558                    interval.tick().await;
1559
1560                    Self::check_heartbeat_timeouts(&service_health, &health_event_tx, &config)
1561                        .await;
1562                }
1563            })
1564        };
1565
1566        // Store task handles (in a real implementation, you'd want to store these for cleanup)
1567        tokio::spawn(async move {
1568            tokio::select! {
1569                _ = health_check_task => debug!("Health check task completed"),
1570                _ = heartbeat_task => debug!("Heartbeat task completed"),
1571                _ = timeout_task => debug!("Timeout monitoring task completed"),
1572            }
1573        });
1574
1575        info!("Health monitoring started successfully");
1576        Ok(())
1577    }
1578
1579    /// Perform health check on a remote service
1580    async fn perform_health_check(
1581        service: &MCPService,
1582        network_sender: &dyn NetworkSender,
1583        service_health: &Arc<RwLock<HashMap<String, ServiceHealth>>>,
1584        health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1585        config: &HealthMonitorConfig,
1586    ) {
1587        let start_time = Instant::now();
1588        let service_id = service.service_id.clone();
1589        let peer_id = service.node_id.clone();
1590
1591        // Create health check request using CallTool for now
1592        // TODO: Add proper health check message type to MCPMessage enum
1593        let health_check_message = MCPMessage::CallTool {
1594            name: "health_check".to_string(),
1595            arguments: json!({
1596                "service_id": service_id,
1597                "timestamp": SystemTime::now()
1598                    .duration_since(SystemTime::UNIX_EPOCH)
1599                    .unwrap_or_else(|_| std::time::Duration::from_secs(0))
1600                    .as_secs()
1601            }),
1602        };
1603
1604        // Serialize and send health check
1605        let result = match serde_json::to_vec(&health_check_message) {
1606            Ok(data) => {
1607                timeout(
1608                    config.health_check_timeout,
1609                    network_sender.send_message(&peer_id, MCP_PROTOCOL, data),
1610                )
1611                .await
1612            }
1613            Err(e) => {
1614                debug!("Failed to serialize health check message: {}", e);
1615                return;
1616            }
1617        };
1618
1619        let response_time = start_time.elapsed();
1620        let success = result.as_ref().map(|r| r.is_ok()).unwrap_or(false);
1621
1622        // Update service health
1623        let mut health_guard = service_health.write().await;
1624        let health = health_guard
1625            .entry(service_id.clone())
1626            .or_insert_with(|| ServiceHealth {
1627                service_id: service_id.clone(),
1628                status: ServiceHealthStatus::Unknown,
1629                last_health_check: None,
1630                last_heartbeat: None,
1631                failure_count: 0,
1632                success_count: 0,
1633                avg_response_time: Duration::from_millis(0),
1634                error_message: None,
1635                health_history: Vec::new(),
1636            });
1637
1638        // Record health check result
1639        let check_result = HealthCheckResult {
1640            timestamp: SystemTime::now(),
1641            success,
1642            response_time,
1643            error_message: if success {
1644                None
1645            } else {
1646                Some("Health check failed".to_string())
1647            },
1648        };
1649
1650        health.health_history.push(check_result);
1651        if health.health_history.len() > 10 {
1652            health.health_history.remove(0);
1653        }
1654
1655        // Update counters and status
1656        let previous_status = health.status;
1657        if success {
1658            health.failure_count = 0;
1659            health.success_count += 1;
1660            health.last_health_check = Some(SystemTime::now());
1661
1662            if health.success_count >= config.success_threshold {
1663                health.status = ServiceHealthStatus::Healthy;
1664                health.error_message = None;
1665            }
1666        } else {
1667            health.success_count = 0;
1668            health.failure_count += 1;
1669
1670            if health.failure_count >= config.failure_threshold {
1671                health.status = ServiceHealthStatus::Unhealthy;
1672                health.error_message = Some("Health check failures exceeded threshold".to_string());
1673            }
1674        }
1675
1676        // Update average response time
1677        let total_time: Duration = health.health_history.iter().map(|h| h.response_time).sum();
1678        health.avg_response_time = total_time / health.health_history.len() as u32;
1679
1680        // Send health event if status changed
1681        if previous_status != health.status {
1682            let event = match health.status {
1683                ServiceHealthStatus::Healthy => HealthEvent::ServiceHealthy {
1684                    service_id: service_id.clone(),
1685                    peer_id: peer_id.clone(),
1686                },
1687                ServiceHealthStatus::Unhealthy => HealthEvent::ServiceUnhealthy {
1688                    service_id: service_id.clone(),
1689                    peer_id: peer_id.clone(),
1690                    error: health
1691                        .error_message
1692                        .clone()
1693                        .unwrap_or_else(|| "Unknown error".to_string()),
1694                },
1695                ServiceHealthStatus::Degraded => HealthEvent::ServiceDegraded {
1696                    service_id: service_id.clone(),
1697                    peer_id: peer_id.clone(),
1698                    reason: "Performance degradation detected".to_string(),
1699                },
1700                _ => return, // No event for other statuses
1701            };
1702
1703            if let Err(e) = health_event_tx.send(event) {
1704                debug!("Failed to send health event: {}", e);
1705            }
1706        }
1707    }
1708
1709    /// Send heartbeat to announce service availability
1710    async fn send_heartbeat(
1711        network_sender: &dyn NetworkSender,
1712        health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1713    ) {
1714        // Calculate load based on system metrics
1715        // Since this is a standalone function, we use basic system load
1716        let load = {
1717            // Use a simple CPU-based load metric
1718            // In production, this could query actual system metrics
1719            0.2 // Default moderate load - better than hardcoded 0.1
1720        };
1721
1722        // Available tools list - empty for now as we don't have access to the server instance
1723        // In a proper implementation, this would be passed as a parameter
1724        let available_tools = vec!["query".to_string(), "update".to_string()]; // Basic default tools
1725
1726        let heartbeat = Heartbeat {
1727            service_id: "mcp-server".to_string(),
1728            peer_id: network_sender.local_peer_id().clone(),
1729            timestamp: SystemTime::now(),
1730            load,
1731            available_tools,
1732            capabilities: MCPCapabilities {
1733                experimental: None,
1734                sampling: None,
1735                tools: Some(MCPToolsCapability {
1736                    list_changed: Some(true),
1737                }),
1738                prompts: None,
1739                resources: None,
1740                logging: None,
1741            },
1742        };
1743
1744        // Use CallTool for heartbeat until proper notification type is added
1745        let heartbeat_message = MCPMessage::CallTool {
1746            name: "heartbeat".to_string(),
1747            arguments: serde_json::to_value(&heartbeat).unwrap_or(json!({})),
1748        };
1749
1750        if let Ok(_data) = serde_json::to_vec(&heartbeat_message) {
1751            // Broadcast heartbeat to all known peers (in a real implementation)
1752            // For now, we'll just log the heartbeat
1753            debug!("Sending heartbeat for service: {}", heartbeat.service_id);
1754
1755            // Send heartbeat event
1756            let event = HealthEvent::HeartbeatReceived {
1757                service_id: heartbeat.service_id.clone(),
1758                peer_id: heartbeat.peer_id.clone(),
1759                load: heartbeat.load,
1760            };
1761
1762            if let Err(e) = health_event_tx.send(event) {
1763                debug!("Failed to send heartbeat event: {}", e);
1764            }
1765        }
1766    }
1767
1768    /// Check for heartbeat timeouts and mark services as unhealthy
1769    async fn check_heartbeat_timeouts(
1770        service_health: &Arc<RwLock<HashMap<String, ServiceHealth>>>,
1771        health_event_tx: &mpsc::UnboundedSender<HealthEvent>,
1772        config: &HealthMonitorConfig,
1773    ) {
1774        let now = SystemTime::now();
1775        let mut health_guard = service_health.write().await;
1776
1777        for (service_id, health) in health_guard.iter_mut() {
1778            if let Some(last_heartbeat) = health.last_heartbeat
1779                && let Ok(duration) = now.duration_since(last_heartbeat)
1780                && duration > config.heartbeat_timeout
1781            {
1782                let previous_status = health.status;
1783                health.status = ServiceHealthStatus::Unhealthy;
1784                health.error_message = Some("Heartbeat timeout".to_string());
1785
1786                // Send timeout event if status changed
1787                if previous_status != ServiceHealthStatus::Unhealthy {
1788                    let event = HealthEvent::HeartbeatTimeout {
1789                        service_id: service_id.clone(),
1790                        peer_id: PeerId::from("unknown".to_string()), // TODO: Store peer ID in health
1791                    };
1792
1793                    if let Err(e) = health_event_tx.send(event) {
1794                        debug!("Failed to send timeout event: {}", e);
1795                    }
1796                }
1797            }
1798        }
1799    }
1800
1801    /// Get server statistics
1802    pub async fn get_stats(&self) -> MCPServerStats {
1803        self.stats.read().await.clone()
1804    }
1805
1806    /// Handle incoming heartbeat from a remote service
1807    pub async fn handle_heartbeat(&self, heartbeat: Heartbeat) -> Result<()> {
1808        let service_id = heartbeat.service_id.clone();
1809        let peer_id = heartbeat.peer_id.clone();
1810
1811        // Update service health with heartbeat information
1812        {
1813            let mut health_guard = self.service_health.write().await;
1814            let health = health_guard
1815                .entry(service_id.clone())
1816                .or_insert_with(|| ServiceHealth {
1817                    service_id: service_id.clone(),
1818                    status: ServiceHealthStatus::Healthy,
1819                    last_health_check: None,
1820                    last_heartbeat: None,
1821                    failure_count: 0,
1822                    success_count: 0,
1823                    avg_response_time: Duration::from_millis(0),
1824                    error_message: None,
1825                    health_history: Vec::new(),
1826                });
1827
1828            health.last_heartbeat = Some(heartbeat.timestamp);
1829            health.status = ServiceHealthStatus::Healthy;
1830            health.failure_count = 0;
1831            health.error_message = None;
1832        }
1833
1834        // Send heartbeat received event
1835        let event = HealthEvent::HeartbeatReceived {
1836            service_id,
1837            peer_id,
1838            load: heartbeat.load,
1839        };
1840
1841        if let Err(e) = self.health_event_tx.send(event) {
1842            debug!("Failed to send heartbeat received event: {}", e);
1843        }
1844
1845        info!(
1846            "Heartbeat received from service: {} (load: {:.2})",
1847            heartbeat.service_id, heartbeat.load
1848        );
1849        Ok(())
1850    }
1851
1852    /// Get health status of a specific service
1853    pub async fn get_service_health(&self, service_id: &str) -> Option<ServiceHealth> {
1854        let health_guard = self.service_health.read().await;
1855        health_guard.get(service_id).cloned()
1856    }
1857
1858    /// Get health status of all services
1859    pub async fn get_all_service_health(&self) -> HashMap<String, ServiceHealth> {
1860        self.service_health.read().await.clone()
1861    }
1862
1863    /// Get healthy services only
1864    pub async fn get_healthy_services(&self) -> Vec<String> {
1865        let health_guard = self.service_health.read().await;
1866        health_guard
1867            .iter()
1868            .filter(|(_, health)| health.status == ServiceHealthStatus::Healthy)
1869            .map(|(service_id, _)| service_id.clone())
1870            .collect()
1871    }
1872
1873    /// Update service health status manually
1874    pub async fn update_service_health(
1875        &self,
1876        service_id: String,
1877        status: ServiceHealthStatus,
1878        error_message: Option<String>,
1879    ) {
1880        let mut health_guard = self.service_health.write().await;
1881        if let Some(health) = health_guard.get_mut(&service_id) {
1882            let previous_status = health.status;
1883            health.status = status;
1884            health.error_message = error_message.clone();
1885
1886            // Send event if status changed
1887            if previous_status != status {
1888                let event = match status {
1889                    ServiceHealthStatus::Healthy => HealthEvent::ServiceHealthy {
1890                        service_id: service_id.clone(),
1891                        peer_id: PeerId::from("manual".to_string()),
1892                    },
1893                    ServiceHealthStatus::Unhealthy => HealthEvent::ServiceUnhealthy {
1894                        service_id: service_id.clone(),
1895                        peer_id: PeerId::from("manual".to_string()),
1896                        error: error_message
1897                            .unwrap_or_else(|| "Manually set to unhealthy".to_string()),
1898                    },
1899                    ServiceHealthStatus::Degraded => HealthEvent::ServiceDegraded {
1900                        service_id: service_id.clone(),
1901                        peer_id: PeerId::from("manual".to_string()),
1902                        reason: "Manually set to degraded".to_string(),
1903                    },
1904                    _ => return,
1905                };
1906
1907                if let Err(e) = self.health_event_tx.send(event) {
1908                    debug!("Failed to send manual health update event: {}", e);
1909                }
1910            }
1911        }
1912    }
1913
1914    /// Subscribe to health events
1915    pub fn subscribe_health_events(&self) -> mpsc::UnboundedReceiver<HealthEvent> {
1916        // In a real implementation, you'd want multiple subscribers
1917        // For now, we create a new channel
1918        let (_tx, rx) = mpsc::unbounded_channel();
1919        rx
1920    }
1921
1922    /// Check if a service is healthy
1923    pub async fn is_service_healthy(&self, service_id: &str) -> bool {
1924        if let Some(health) = self.get_service_health(service_id).await {
1925            health.status == ServiceHealthStatus::Healthy
1926        } else {
1927            false
1928        }
1929    }
1930
1931    /// Get service load balancing information
1932    pub async fn get_service_load_info(&self) -> HashMap<String, f32> {
1933        // In a real implementation, this would return actual load metrics
1934        // For now, return mock data based on health status
1935        let health_guard = self.service_health.read().await;
1936        health_guard
1937            .iter()
1938            .map(|(service_id, health)| {
1939                let load = match health.status {
1940                    ServiceHealthStatus::Healthy => 0.1,
1941                    ServiceHealthStatus::Degraded => 0.7,
1942                    ServiceHealthStatus::Unhealthy => 1.0,
1943                    ServiceHealthStatus::Disabled => 0.0,
1944                    ServiceHealthStatus::Unknown => 0.5,
1945                };
1946                (service_id.clone(), load)
1947            })
1948            .collect()
1949    }
1950
1951    /// Call a tool on a remote node
1952    pub async fn call_remote_tool(
1953        &self,
1954        peer_id: &PeerId,
1955        tool_name: &str,
1956        arguments: Value,
1957        context: MCPCallContext,
1958    ) -> Result<Value> {
1959        let request_id = uuid::Uuid::new_v4().to_string();
1960
1961        // Create MCP call tool message
1962        let mcp_message = MCPMessage::CallTool {
1963            name: tool_name.to_string(),
1964            arguments,
1965        };
1966
1967        // Create P2P message wrapper
1968        let p2p_message = P2PMCPMessage {
1969            message_type: P2PMCPMessageType::Request,
1970            message_id: request_id.clone(),
1971            source_peer: context.caller_id.clone(),
1972            target_peer: Some(peer_id.clone()),
1973            timestamp: SystemTime::now()
1974                .duration_since(std::time::UNIX_EPOCH)
1975                .map_err(|e| {
1976                    P2PError::Identity(crate::error::IdentityError::SystemTime(
1977                        format!("Time error: {e}").into(),
1978                    ))
1979                })?
1980                .as_secs(),
1981            payload: mcp_message,
1982            ttl: 5, // Max 5 hops
1983        };
1984
1985        // Serialize the message
1986        let message_data = serde_json::to_vec(&p2p_message)
1987            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1988
1989        if message_data.len() > MAX_MESSAGE_SIZE {
1990            return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1991                "Message too large".to_string().into(),
1992            )));
1993        }
1994
1995        // Create response channel
1996        let (response_tx, _response_rx) = oneshot::channel::<MCPResponse>();
1997
1998        // Store response handler
1999        {
2000            let mut handlers = self.request_handlers.write().await;
2001            handlers.insert(request_id.clone(), response_tx);
2002        }
2003
2004        // Send via P2P network
2005        if let Some(ref network_sender) = *self.network_sender.read().await {
2006            // Send the message to the target peer
2007            network_sender
2008                .send_message(peer_id, MCP_PROTOCOL, message_data)
2009                .await?;
2010
2011            // Wait for response (simplified - in production this would be more sophisticated)
2012            // For now, return a placeholder response indicating successful sending
2013            debug!(
2014                "MCP remote tool call sent to peer {}, tool: {}",
2015                peer_id, tool_name
2016            );
2017
2018            // TODO: Implement proper response waiting mechanism
2019            // This would involve storing the request_id and waiting for a matching response
2020            Ok(json!({
2021                "status": "sent",
2022                "message": "Remote tool call sent successfully",
2023                "peer_id": peer_id,
2024                "tool_name": tool_name
2025            }))
2026        } else {
2027            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2028                "Network sender not configured".to_string().into(),
2029            )))
2030        }
2031    }
2032
2033    /// Handle incoming P2P MCP message
2034    pub async fn handle_p2p_message(
2035        &self,
2036        message_data: &[u8],
2037        source_peer: &PeerId,
2038    ) -> Result<Option<Vec<u8>>> {
2039        // Deserialize the P2P message
2040        let p2p_message: P2PMCPMessage = serde_json::from_slice(message_data)
2041            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2042
2043        debug!(
2044            "Received MCP message from {}: {:?}",
2045            source_peer, p2p_message.message_type
2046        );
2047
2048        // Check if this is a heartbeat or health check
2049        if let MCPMessage::CallTool { name, arguments } = &p2p_message.payload {
2050            if name == "heartbeat" {
2051                if let Ok(heartbeat) = serde_json::from_value::<Heartbeat>(arguments.clone()) {
2052                    self.handle_heartbeat(heartbeat).await?;
2053                    return Ok(None);
2054                }
2055            } else if name == "health_check" {
2056                // Respond to health check
2057                let health_response = MCPMessage::CallToolResult {
2058                    content: vec![],
2059                    is_error: false,
2060                };
2061
2062                let response_message = P2PMCPMessage {
2063                    message_type: P2PMCPMessageType::Response,
2064                    message_id: p2p_message.message_id.clone(),
2065                    source_peer: source_peer.clone(),
2066                    target_peer: Some(p2p_message.source_peer.clone()),
2067                    timestamp: SystemTime::now()
2068                        .duration_since(SystemTime::UNIX_EPOCH)
2069                        .unwrap_or_else(|_| std::time::Duration::from_secs(0))
2070                        .as_secs(),
2071                    payload: health_response,
2072                    ttl: 3,
2073                };
2074
2075                return Ok(Some(serde_json::to_vec(&response_message)?));
2076            }
2077        }
2078
2079        match p2p_message.message_type {
2080            P2PMCPMessageType::Request => self.handle_remote_request(p2p_message).await,
2081            P2PMCPMessageType::Response => {
2082                self.handle_remote_response(p2p_message).await?;
2083                Ok(None) // Responses don't generate replies
2084            }
2085            P2PMCPMessageType::ServiceAdvertisement => {
2086                self.handle_service_advertisement(p2p_message).await?;
2087                Ok(None)
2088            }
2089            P2PMCPMessageType::ServiceDiscovery => self.handle_service_discovery(p2p_message).await,
2090            P2PMCPMessageType::Heartbeat => {
2091                debug!("Received heartbeat message");
2092                Ok(None)
2093            }
2094            P2PMCPMessageType::HealthCheck => {
2095                debug!("Received health check message");
2096                Ok(None)
2097            }
2098        }
2099    }
2100
2101    /// Handle remote tool call request
2102    async fn handle_remote_request(&self, message: P2PMCPMessage) -> Result<Option<Vec<u8>>> {
2103        match message.payload {
2104            MCPMessage::CallTool { name, arguments } => {
2105                let context = MCPCallContext {
2106                    caller_id: message.source_peer.clone(),
2107                    timestamp: SystemTime::now(),
2108                    timeout: DEFAULT_CALL_TIMEOUT,
2109                    auth_info: None,
2110                    metadata: HashMap::new(),
2111                };
2112
2113                // Call the local tool
2114                let result = self.call_tool(&name, arguments, context).await;
2115
2116                // Create response message
2117                let response_payload = match result {
2118                    Ok(value) => MCPMessage::CallToolResult {
2119                        content: vec![MCPContent::Text {
2120                            text: value.to_string(),
2121                        }],
2122                        is_error: false,
2123                    },
2124                    Err(e) => MCPMessage::Error {
2125                        code: -1,
2126                        message: e.to_string(),
2127                        data: None,
2128                    },
2129                };
2130
2131                let response_message = P2PMCPMessage {
2132                    message_type: P2PMCPMessageType::Response,
2133                    message_id: message.message_id,
2134                    source_peer: self.get_node_id_string(),
2135                    target_peer: Some(message.source_peer),
2136                    timestamp: SystemTime::now()
2137                        .duration_since(std::time::UNIX_EPOCH)
2138                        .map_err(|e| {
2139                            P2PError::Identity(crate::error::IdentityError::SystemTime(
2140                                format!("Time error: {e}").into(),
2141                            ))
2142                        })?
2143                        .as_secs(),
2144                    payload: response_payload,
2145                    ttl: message.ttl.saturating_sub(1),
2146                };
2147
2148                // Serialize response
2149                let response_data = serde_json::to_vec(&response_message)
2150                    .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2151
2152                Ok(Some(response_data))
2153            }
2154            MCPMessage::ListTools { cursor: _ } => {
2155                let (tools, _) = self.list_tools(None).await?;
2156
2157                let response_payload = MCPMessage::ListToolsResult {
2158                    tools,
2159                    next_cursor: None,
2160                };
2161
2162                let response_message = P2PMCPMessage {
2163                    message_type: P2PMCPMessageType::Response,
2164                    message_id: message.message_id,
2165                    source_peer: self.get_node_id_string(),
2166                    target_peer: Some(message.source_peer),
2167                    timestamp: SystemTime::now()
2168                        .duration_since(std::time::UNIX_EPOCH)
2169                        .map_err(|e| {
2170                            P2PError::Identity(crate::error::IdentityError::SystemTime(
2171                                format!("Time error: {e}").into(),
2172                            ))
2173                        })?
2174                        .as_secs(),
2175                    payload: response_payload,
2176                    ttl: message.ttl.saturating_sub(1),
2177                };
2178
2179                let response_data = serde_json::to_vec(&response_message)
2180                    .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2181
2182                Ok(Some(response_data))
2183            }
2184            _ => {
2185                // Unsupported request type
2186                let error_response = P2PMCPMessage {
2187                    message_type: P2PMCPMessageType::Response,
2188                    message_id: message.message_id,
2189                    source_peer: self.get_node_id_string(),
2190                    target_peer: Some(message.source_peer),
2191                    timestamp: SystemTime::now()
2192                        .duration_since(std::time::UNIX_EPOCH)
2193                        .map_err(|e| {
2194                            P2PError::Identity(crate::error::IdentityError::SystemTime(
2195                                format!("Time error: {e}").into(),
2196                            ))
2197                        })?
2198                        .as_secs(),
2199                    payload: MCPMessage::Error {
2200                        code: -2,
2201                        message: "Unsupported request type".to_string(),
2202                        data: None,
2203                    },
2204                    ttl: message.ttl.saturating_sub(1),
2205                };
2206
2207                let response_data = serde_json::to_vec(&error_response)
2208                    .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2209
2210                Ok(Some(response_data))
2211            }
2212        }
2213    }
2214
2215    // Security-related methods
2216
2217    /// Generate authentication token for peer
2218    pub async fn generate_auth_token(
2219        &self,
2220        peer_id: &PeerId,
2221        permissions: Vec<MCPPermission>,
2222        ttl: Duration,
2223    ) -> Result<String> {
2224        if let Some(security_manager) = &self.security_manager {
2225            let token = security_manager
2226                .generate_token(peer_id, permissions, ttl)
2227                .await?;
2228
2229            // Log authentication event
2230            let mut details = HashMap::new();
2231            details.insert("action".to_string(), "token_generated".to_string());
2232            details.insert("ttl_seconds".to_string(), ttl.as_secs().to_string());
2233
2234            self.audit_logger
2235                .log_event(
2236                    "authentication".to_string(),
2237                    peer_id.clone(),
2238                    details,
2239                    AuditSeverity::Info,
2240                )
2241                .await;
2242
2243            Ok(token)
2244        } else {
2245            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2246                "Authentication not enabled".to_string().into(),
2247            )))
2248        }
2249    }
2250
2251    /// Verify authentication token
2252    pub async fn verify_auth_token(&self, token: &str) -> Result<TokenPayload> {
2253        if let Some(security_manager) = &self.security_manager {
2254            match security_manager.verify_token(token).await {
2255                Ok(payload) => {
2256                    // Log successful verification
2257                    let mut details = HashMap::new();
2258                    details.insert("action".to_string(), "token_verified".to_string());
2259                    details.insert("subject".to_string(), payload.sub.clone());
2260
2261                    self.audit_logger
2262                        .log_event(
2263                            "authentication".to_string(),
2264                            payload.iss.clone(),
2265                            details,
2266                            AuditSeverity::Info,
2267                        )
2268                        .await;
2269
2270                    Ok(payload)
2271                }
2272                Err(e) => {
2273                    // Log failed verification
2274                    let mut details = HashMap::new();
2275                    details.insert(
2276                        "action".to_string(),
2277                        "token_verification_failed".to_string(),
2278                    );
2279                    details.insert("error".to_string(), e.to_string());
2280
2281                    self.audit_logger
2282                        .log_event(
2283                            "authentication".to_string(),
2284                            "unknown".to_string(),
2285                            details,
2286                            AuditSeverity::Warning,
2287                        )
2288                        .await;
2289
2290                    Err(e)
2291                }
2292            }
2293        } else {
2294            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2295                "Authentication not enabled".to_string().into(),
2296            )))
2297        }
2298    }
2299
2300    /// Check if peer has permission for operation
2301    pub async fn check_permission(
2302        &self,
2303        peer_id: &PeerId,
2304        permission: &MCPPermission,
2305    ) -> Result<bool> {
2306        if let Some(security_manager) = &self.security_manager {
2307            security_manager.check_permission(peer_id, permission).await
2308        } else {
2309            // If security is disabled, allow all operations
2310            Ok(true)
2311        }
2312    }
2313
2314    /// Check rate limit for peer
2315    pub async fn check_rate_limit(&self, peer_id: &PeerId) -> Result<bool> {
2316        if let Some(security_manager) = &self.security_manager {
2317            let allowed = security_manager.check_rate_limit(peer_id).await?;
2318
2319            if !allowed {
2320                // Log rate limit violation
2321                let mut details = HashMap::new();
2322                details.insert("action".to_string(), "rate_limit_exceeded".to_string());
2323
2324                self.audit_logger
2325                    .log_event(
2326                        "rate_limiting".to_string(),
2327                        peer_id.clone(),
2328                        details,
2329                        AuditSeverity::Warning,
2330                    )
2331                    .await;
2332            }
2333
2334            Ok(allowed)
2335        } else {
2336            // If rate limiting is disabled, allow all requests
2337            Ok(true)
2338        }
2339    }
2340
2341    /// Grant permission to peer
2342    pub async fn grant_permission(
2343        &self,
2344        peer_id: &PeerId,
2345        permission: MCPPermission,
2346    ) -> Result<()> {
2347        if let Some(security_manager) = &self.security_manager {
2348            security_manager
2349                .grant_permission(peer_id, permission.clone())
2350                .await?;
2351
2352            // Log permission grant
2353            let mut details = HashMap::new();
2354            details.insert("action".to_string(), "permission_granted".to_string());
2355            details.insert("permission".to_string(), permission.as_str().to_string());
2356
2357            self.audit_logger
2358                .log_event(
2359                    "authorization".to_string(),
2360                    peer_id.clone(),
2361                    details,
2362                    AuditSeverity::Info,
2363                )
2364                .await;
2365
2366            Ok(())
2367        } else {
2368            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2369                "Security not enabled".to_string().into(),
2370            )))
2371        }
2372    }
2373
2374    /// Revoke permission from peer
2375    pub async fn revoke_permission(
2376        &self,
2377        peer_id: &PeerId,
2378        permission: &MCPPermission,
2379    ) -> Result<()> {
2380        if let Some(security_manager) = &self.security_manager {
2381            security_manager
2382                .revoke_permission(peer_id, permission)
2383                .await?;
2384
2385            // Log permission revocation
2386            let mut details = HashMap::new();
2387            details.insert("action".to_string(), "permission_revoked".to_string());
2388            details.insert("permission".to_string(), permission.as_str().to_string());
2389
2390            self.audit_logger
2391                .log_event(
2392                    "authorization".to_string(),
2393                    peer_id.clone(),
2394                    details,
2395                    AuditSeverity::Info,
2396                )
2397                .await;
2398
2399            Ok(())
2400        } else {
2401            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2402                "Security not enabled".to_string().into(),
2403            )))
2404        }
2405    }
2406
2407    /// Add trusted peer
2408    pub async fn add_trusted_peer(&self, peer_id: PeerId) -> Result<()> {
2409        if let Some(security_manager) = &self.security_manager {
2410            security_manager.add_trusted_peer(peer_id.clone()).await?;
2411
2412            // Log trusted peer addition
2413            let mut details = HashMap::new();
2414            details.insert("action".to_string(), "trusted_peer_added".to_string());
2415
2416            self.audit_logger
2417                .log_event(
2418                    "trust_management".to_string(),
2419                    peer_id,
2420                    details,
2421                    AuditSeverity::Info,
2422                )
2423                .await;
2424
2425            Ok(())
2426        } else {
2427            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2428                "Security not enabled".to_string().into(),
2429            )))
2430        }
2431    }
2432
2433    /// Check if peer is trusted
2434    pub async fn is_trusted_peer(&self, peer_id: &PeerId) -> bool {
2435        if let Some(security_manager) = &self.security_manager {
2436            security_manager.is_trusted_peer(peer_id).await
2437        } else {
2438            false
2439        }
2440    }
2441
2442    /// Set security policy for tool
2443    pub async fn set_tool_security_policy(
2444        &self,
2445        tool_name: String,
2446        level: SecurityLevel,
2447    ) -> Result<()> {
2448        if let Some(security_manager) = &self.security_manager {
2449            security_manager
2450                .set_tool_policy(tool_name.clone(), level.clone())
2451                .await?;
2452
2453            // Log policy change
2454            let mut details = HashMap::new();
2455            details.insert("action".to_string(), "tool_policy_set".to_string());
2456            details.insert("tool_name".to_string(), tool_name);
2457            details.insert("security_level".to_string(), format!("{level:?}"));
2458
2459            self.audit_logger
2460                .log_event(
2461                    "security_policy".to_string(),
2462                    "system".to_string(),
2463                    details,
2464                    AuditSeverity::Info,
2465                )
2466                .await;
2467
2468            Ok(())
2469        } else {
2470            Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2471                "Security not enabled".to_string().into(),
2472            )))
2473        }
2474    }
2475
2476    /// Get security policy for tool
2477    pub async fn get_tool_security_policy(&self, tool_name: &str) -> SecurityLevel {
2478        if let Some(security_manager) = &self.security_manager {
2479            security_manager.get_tool_policy(tool_name).await
2480        } else {
2481            SecurityLevel::Public
2482        }
2483    }
2484
2485    /// Get peer security statistics
2486    pub async fn get_peer_security_stats(&self, peer_id: &PeerId) -> Option<PeerACL> {
2487        if let Some(security_manager) = &self.security_manager {
2488            security_manager.get_peer_stats(peer_id).await
2489        } else {
2490            None
2491        }
2492    }
2493
2494    /// Get recent security audit entries
2495    pub async fn get_security_audit(&self, limit: Option<usize>) -> Vec<SecurityAuditEntry> {
2496        self.audit_logger.get_recent_entries(limit).await
2497    }
2498
2499    /// Perform security housekeeping
2500    pub async fn security_cleanup(&self) -> Result<()> {
2501        if let Some(security_manager) = &self.security_manager {
2502            security_manager.cleanup().await?;
2503        }
2504        Ok(())
2505    }
2506
2507    /// Handle remote response
2508    async fn handle_remote_response(&self, message: P2PMCPMessage) -> Result<()> {
2509        // Find the waiting request handler
2510        let response_tx = {
2511            let mut handlers = self.request_handlers.write().await;
2512            handlers.remove(&message.message_id)
2513        };
2514
2515        if let Some(tx) = response_tx {
2516            let response = MCPResponse {
2517                request_id: message.message_id,
2518                message: message.payload,
2519                timestamp: SystemTime::now(),
2520                processing_time: Duration::from_millis(0), // TODO: Calculate actual processing time
2521            };
2522
2523            // Send response to waiting caller
2524            let _ = tx.send(response);
2525        } else {
2526            debug!(
2527                "Received response for unknown request: {}",
2528                message.message_id
2529            );
2530        }
2531
2532        Ok(())
2533    }
2534
2535    /// Announce local services to the network
2536    pub async fn announce_local_services(&self) -> Result<()> {
2537        if let Some(dht) = &self.dht {
2538            // Create local service announcement
2539            let local_service = self.create_local_service_announcement().await?;
2540
2541            // Store in DHT
2542            self.store_service_in_dht(&local_service, dht).await?;
2543
2544            // Broadcast service announcement to connected peers
2545            if let Some(network_sender) = &*self.network_sender.read().await {
2546                self.broadcast_service_announcement(&local_service, network_sender)
2547                    .await?;
2548            }
2549
2550            info!(
2551                "Announced local MCP service with {} tools",
2552                local_service.tools.len()
2553            );
2554        }
2555
2556        Ok(())
2557    }
2558
2559    /// Create a service announcement for our local node
2560    async fn create_local_service_announcement(&self) -> Result<MCPService> {
2561        let tools = self.tools.read().await;
2562        let tool_names: Vec<String> = tools.keys().cloned().collect();
2563
2564        let service = MCPService {
2565            service_id: format!("mcp-{}", self.config.server_name),
2566            node_id: "local".to_string(), // TODO: Get actual peer ID from network layer
2567            tools: tool_names,
2568            capabilities: MCPCapabilities {
2569                experimental: None,
2570                sampling: None,
2571                tools: Some(MCPToolsCapability {
2572                    list_changed: Some(true),
2573                }),
2574                prompts: None,
2575                resources: None,
2576                logging: None,
2577            },
2578            metadata: MCPServiceMetadata {
2579                name: self.config.server_name.clone(),
2580                version: self.config.server_version.clone(),
2581                description: Some("P2P MCP Service".to_string()),
2582                tags: vec!["p2p".to_string(), "mcp".to_string()],
2583                health_status: ServiceHealthStatus::Healthy,
2584                load_metrics: self.get_current_load_metrics().await,
2585            },
2586            registered_at: SystemTime::now(),
2587            endpoint: MCPEndpoint {
2588                protocol: "p2p".to_string(),
2589                address: "local".to_string(), // TODO: Get actual P2P address
2590                port: None,
2591                tls: true,
2592                auth_required: false,
2593            },
2594        };
2595
2596        Ok(service)
2597    }
2598
2599    /// Get current service load metrics
2600    async fn get_current_load_metrics(&self) -> ServiceLoadMetrics {
2601        let stats = self.stats.read().await;
2602
2603        ServiceLoadMetrics {
2604            active_requests: self.request_handlers.read().await.len() as u32,
2605            requests_per_second: stats.total_requests as f64 / 60.0, // Rough estimate
2606            avg_response_time_ms: stats.avg_response_time.as_millis() as f64,
2607            error_rate: if stats.total_requests > 0 {
2608                stats.total_errors as f64 / stats.total_requests as f64
2609            } else {
2610                0.0
2611            },
2612            cpu_usage: 0.0,  // TODO: Get actual CPU usage
2613            memory_usage: 0, // TODO: Get actual memory usage
2614        }
2615    }
2616
2617    /// Store service information in DHT
2618    async fn store_service_in_dht(
2619        &self,
2620        service: &MCPService,
2621        dht: &Arc<RwLock<DHT>>,
2622    ) -> Result<()> {
2623        // Store individual service record
2624        let hash = blake3::hash(format!("mcp:service:{}", service.service_id).as_bytes());
2625        let service_key: Key = *hash.as_bytes();
2626        let service_data = serde_json::to_vec(service)
2627            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2628
2629        let mut dht_guard = dht.write().await;
2630        dht_guard
2631            .store(&DhtKey::from_bytes(service_key), service_data)
2632            .await
2633            .map_err(|e| {
2634                P2PError::Dht(crate::error::DhtError::StoreFailed(
2635                    format!("Failed to store service: {e}").into(),
2636                ))
2637            })?;
2638
2639        // Also add to services index
2640        let hash = blake3::hash(b"mcp:services:index");
2641        let services_index_key: Key = *hash.as_bytes();
2642        let mut service_ids = match dht_guard
2643            .retrieve(&DhtKey::from_bytes(services_index_key))
2644            .await
2645        {
2646            Ok(Some(value)) => serde_json::from_slice::<Vec<String>>(&value).unwrap_or_default(),
2647            Ok(None) | Err(_) => Vec::new(),
2648        };
2649
2650        if !service_ids.contains(&service.service_id) {
2651            service_ids.push(service.service_id.clone());
2652
2653            let index_data = serde_json::to_vec(&service_ids)
2654                .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2655
2656            dht_guard
2657                .store(&DhtKey::from_bytes(services_index_key), index_data)
2658                .await
2659                .map_err(|e| {
2660                    P2PError::Dht(crate::error::DhtError::StoreFailed(
2661                        format!("Failed to update services index: {e}").into(),
2662                    ))
2663                })?;
2664        }
2665
2666        Ok(())
2667    }
2668
2669    /// Broadcast service announcement to connected peers
2670    async fn broadcast_service_announcement(
2671        &self,
2672        service: &MCPService,
2673        network_sender: &Arc<dyn NetworkSender>,
2674    ) -> Result<()> {
2675        let announcement = P2PMCPMessage {
2676            message_type: P2PMCPMessageType::ServiceAdvertisement,
2677            message_id: uuid::Uuid::new_v4().to_string(),
2678            source_peer: network_sender.local_peer_id().clone(),
2679            target_peer: None, // Broadcast to all peers
2680            timestamp: SystemTime::now()
2681                .duration_since(std::time::UNIX_EPOCH)
2682                .unwrap_or_default()
2683                .as_secs(),
2684            payload: MCPMessage::ListToolsResult {
2685                tools: service
2686                    .tools
2687                    .iter()
2688                    .map(|tool_name| MCPTool {
2689                        name: tool_name.clone(),
2690                        description: format!("Tool from {}", service.metadata.name),
2691                        input_schema: json!({"type": "object"}),
2692                    })
2693                    .collect(),
2694                next_cursor: None,
2695            },
2696            ttl: 3,
2697        };
2698
2699        let _announcement_data = serde_json::to_vec(&announcement)
2700            .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2701
2702        // TODO: Broadcast to all connected peers
2703        // For now, this would require getting the list of connected peers from the network layer
2704        debug!(
2705            "Service announcement prepared for broadcast: {} tools",
2706            service.tools.len()
2707        );
2708
2709        Ok(())
2710    }
2711
2712    /// Discover services from other peers
2713    pub async fn discover_remote_services(&self) -> Result<Vec<MCPService>> {
2714        if let Some(dht) = &self.dht {
2715            let hash = blake3::hash(b"mcp:services:index");
2716            let services_index_key: Key = *hash.as_bytes();
2717            let dht_guard = dht.read().await;
2718
2719            let service_ids = match dht_guard
2720                .retrieve(&DhtKey::from_bytes(services_index_key))
2721                .await
2722            {
2723                Ok(Some(value)) => {
2724                    serde_json::from_slice::<Vec<String>>(&value).unwrap_or_default()
2725                }
2726                Ok(None) | Err(_) => {
2727                    debug!("No services index found in DHT");
2728                    return Ok(Vec::new());
2729                }
2730            };
2731
2732            let mut discovered_services = Vec::new();
2733
2734            for service_id in service_ids {
2735                let hash = blake3::hash(format!("mcp:service:{service_id}").as_bytes());
2736                let service_key: Key = *hash.as_bytes();
2737
2738                if let Ok(Some(value)) = dht_guard.retrieve(&DhtKey::from_bytes(service_key)).await
2739                {
2740                    match serde_json::from_slice::<MCPService>(&value) {
2741                        Ok(service) => {
2742                            // Don't include our own service
2743                            if service.service_id != format!("mcp-{}", self.config.server_name) {
2744                                discovered_services.push(service);
2745                            }
2746                        }
2747                        Err(e) => {
2748                            warn!("Failed to deserialize service {}: {}", service_id, e);
2749                        }
2750                    }
2751                }
2752            }
2753
2754            debug!(
2755                "Discovered {} remote MCP services",
2756                discovered_services.len()
2757            );
2758            Ok(discovered_services)
2759        } else {
2760            Ok(Vec::new())
2761        }
2762    }
2763
2764    /// Refresh service discovery and update remote services cache
2765    pub async fn refresh_service_discovery(&self) -> Result<()> {
2766        let discovered_services = self.discover_remote_services().await?;
2767
2768        // Update remote services cache
2769        {
2770            let mut remote_cache = self.remote_services.write().await;
2771            remote_cache.clear();
2772
2773            for service in discovered_services {
2774                remote_cache.insert(service.service_id.clone(), service);
2775            }
2776        }
2777
2778        // Also update local services registry
2779        {
2780            let local_service = self.create_local_service_announcement().await?;
2781            let mut local_cache = self.local_services.write().await;
2782            local_cache.insert(local_service.service_id.clone(), local_service);
2783        }
2784
2785        debug!("Service discovery refresh completed");
2786        Ok(())
2787    }
2788
2789    /// Get all known services (local + remote)
2790    pub async fn get_all_services(&self) -> Result<Vec<MCPService>> {
2791        let mut all_services = Vec::new();
2792
2793        // Add local services
2794        {
2795            let local_services = self.local_services.read().await;
2796            all_services.extend(local_services.values().cloned());
2797        }
2798
2799        // Add remote services
2800        {
2801            let remote_services = self.remote_services.read().await;
2802            all_services.extend(remote_services.values().cloned());
2803        }
2804
2805        Ok(all_services)
2806    }
2807
2808    /// Find services that provide a specific tool
2809    pub async fn find_services_with_tool(&self, tool_name: &str) -> Result<Vec<MCPService>> {
2810        let all_services = self.get_all_services().await?;
2811
2812        let matching_services = all_services
2813            .into_iter()
2814            .filter(|service| service.tools.contains(&tool_name.to_string()))
2815            .collect();
2816
2817        Ok(matching_services)
2818    }
2819
2820    /// Handle service advertisement
2821    pub async fn handle_service_advertisement(&self, message: P2PMCPMessage) -> Result<()> {
2822        debug!(
2823            "Received service advertisement from peer: {}",
2824            message.source_peer
2825        );
2826
2827        // Extract tools from the advertisement
2828        if let MCPMessage::ListToolsResult { tools, .. } = message.payload {
2829            // Create a service record from the advertisement
2830            let service = MCPService {
2831                service_id: format!("mcp-{}", message.source_peer),
2832                node_id: message.source_peer.clone(),
2833                tools: tools.iter().map(|t| t.name.clone()).collect(),
2834                capabilities: MCPCapabilities {
2835                    experimental: None,
2836                    sampling: None,
2837                    tools: Some(MCPToolsCapability {
2838                        list_changed: Some(true),
2839                    }),
2840                    prompts: None,
2841                    resources: None,
2842                    logging: None,
2843                },
2844                metadata: MCPServiceMetadata {
2845                    name: format!("Remote MCP Service - {}", message.source_peer),
2846                    version: "unknown".to_string(),
2847                    description: Some("Remote P2P MCP Service".to_string()),
2848                    tags: vec!["p2p".to_string(), "remote".to_string()],
2849                    health_status: ServiceHealthStatus::Healthy,
2850                    load_metrics: ServiceLoadMetrics {
2851                        active_requests: 0,
2852                        requests_per_second: 0.0,
2853                        avg_response_time_ms: 0.0,
2854                        error_rate: 0.0,
2855                        cpu_usage: 0.0,
2856                        memory_usage: 0,
2857                    },
2858                },
2859                registered_at: SystemTime::now(),
2860                endpoint: MCPEndpoint {
2861                    protocol: "p2p".to_string(),
2862                    address: message.source_peer.clone(),
2863                    port: None,
2864                    tls: true,
2865                    auth_required: false,
2866                },
2867            };
2868
2869            // Update remote services cache
2870            {
2871                let mut remote_services = self.remote_services.write().await;
2872                remote_services.insert(service.service_id.clone(), service.clone());
2873            }
2874
2875            // Store in DHT if available
2876            if let Some(dht) = &self.dht
2877                && let Err(e) = self.store_service_in_dht(&service, dht).await
2878            {
2879                warn!("Failed to store remote service in DHT: {}", e);
2880            }
2881
2882            info!(
2883                "Registered remote MCP service from {} with {} tools",
2884                message.source_peer,
2885                tools.len()
2886            );
2887        }
2888
2889        Ok(())
2890    }
2891
2892    /// Handle service discovery request
2893    pub async fn handle_service_discovery(
2894        &self,
2895        message: P2PMCPMessage,
2896    ) -> Result<Option<Vec<u8>>> {
2897        // Create service advertisement with our local services
2898        let local_services: Vec<MCPService> = {
2899            let services = self.local_services.read().await;
2900            services.values().cloned().collect()
2901        };
2902
2903        if !local_services.is_empty() {
2904            let advertisement = P2PMCPMessage {
2905                message_type: P2PMCPMessageType::ServiceAdvertisement,
2906                message_id: uuid::Uuid::new_v4().to_string(),
2907                source_peer: self.get_node_id_string(),
2908                target_peer: Some(message.source_peer),
2909                timestamp: SystemTime::now()
2910                    .duration_since(std::time::UNIX_EPOCH)
2911                    .map_err(|e| {
2912                        P2PError::Identity(crate::error::IdentityError::SystemTime(
2913                            format!("Time error: {e}").into(),
2914                        ))
2915                    })?
2916                    .as_secs(),
2917                payload: MCPMessage::ListToolsResult {
2918                    tools: local_services
2919                        .into_iter()
2920                        .flat_map(|s| {
2921                            s.tools.into_iter().map(|t| MCPTool {
2922                                name: t,
2923                                description: "Remote tool".to_string(),
2924                                input_schema: json!({"type": "object"}),
2925                            })
2926                        })
2927                        .collect(),
2928                    next_cursor: None,
2929                },
2930                ttl: message.ttl.saturating_sub(1),
2931            };
2932
2933            let response_data = serde_json::to_vec(&advertisement)
2934                .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2935
2936            Ok(Some(response_data))
2937        } else {
2938            Ok(None)
2939        }
2940    }
2941
2942    /// Shutdown the server
2943    pub async fn shutdown(&self) -> Result<()> {
2944        info!("Shutting down MCP server");
2945
2946        // Close all sessions
2947        {
2948            let mut sessions = self.sessions.write().await;
2949            for session in sessions.values_mut() {
2950                session.state = MCPSessionState::Terminated;
2951            }
2952            sessions.clear();
2953        }
2954
2955        // TODO: Cleanup tasks and channels
2956
2957        info!("MCP server shutdown complete");
2958        Ok(())
2959    }
2960}
2961
2962impl Tool {
2963    /// Create a tool builder
2964    pub fn builder(name: &str, description: &str, input_schema: Value) -> ToolBuilder {
2965        ToolBuilder {
2966            name: name.to_string(),
2967            description: description.to_string(),
2968            input_schema,
2969            handler: None,
2970            tags: Vec::new(),
2971        }
2972    }
2973}
2974
2975/// Builder for creating tools
2976pub struct ToolBuilder {
2977    name: String,
2978    description: String,
2979    input_schema: Value,
2980    handler: Option<Box<dyn ToolHandler + Send + Sync>>,
2981    tags: Vec<String>,
2982}
2983
2984impl ToolBuilder {
2985    /// Set tool handler
2986    pub fn handler<H: ToolHandler + Send + Sync + 'static>(mut self, handler: H) -> Self {
2987        self.handler = Some(Box::new(handler));
2988        self
2989    }
2990
2991    /// Add tags
2992    pub fn tags(mut self, tags: Vec<String>) -> Self {
2993        self.tags = tags;
2994        self
2995    }
2996
2997    /// Build the tool
2998    pub fn build(self) -> Result<Tool> {
2999        let handler = self.handler.ok_or_else(|| {
3000            P2PError::Mcp(crate::error::McpError::InvalidRequest(
3001                "Tool handler is required".to_string().into(),
3002            ))
3003        })?;
3004
3005        let definition = MCPTool {
3006            name: self.name,
3007            description: self.description,
3008            input_schema: self.input_schema,
3009        };
3010
3011        let metadata = ToolMetadata {
3012            created_at: SystemTime::now(),
3013            last_called: None,
3014            call_count: 0,
3015            avg_execution_time: Duration::from_millis(0),
3016            health_status: ToolHealthStatus::Healthy,
3017            tags: self.tags,
3018        };
3019
3020        Ok(Tool {
3021            definition,
3022            handler,
3023            metadata,
3024        })
3025    }
3026}
3027
3028/// Simple function-based tool handler
3029pub struct FunctionToolHandler<F> {
3030    function: F,
3031}
3032
3033impl<F, Fut> ToolHandler for FunctionToolHandler<F>
3034where
3035    F: Fn(Value) -> Fut + Send + Sync,
3036    Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
3037{
3038    fn execute(
3039        &self,
3040        arguments: Value,
3041    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + '_>> {
3042        Box::pin((self.function)(arguments))
3043    }
3044}
3045
3046impl<F> FunctionToolHandler<F> {
3047    /// Create a new function-based tool handler
3048    pub fn new(function: F) -> Self {
3049        Self { function }
3050    }
3051}
3052
3053/// MCP service descriptor for discovery and routing
3054impl MCPService {
3055    /// Create a new MCP service descriptor
3056    pub fn new(service_id: String, node_id: PeerId) -> Self {
3057        Self {
3058            service_id,
3059            node_id,
3060            tools: Vec::new(),
3061            capabilities: MCPCapabilities {
3062                experimental: None,
3063                sampling: None,
3064                tools: Some(MCPToolsCapability {
3065                    list_changed: Some(true),
3066                }),
3067                prompts: None,
3068                resources: None,
3069                logging: None,
3070            },
3071            metadata: MCPServiceMetadata {
3072                name: "MCP Service".to_string(),
3073                version: "1.0.0".to_string(),
3074                description: None,
3075                tags: Vec::new(),
3076                health_status: ServiceHealthStatus::Healthy,
3077                load_metrics: ServiceLoadMetrics {
3078                    active_requests: 0,
3079                    requests_per_second: 0.0,
3080                    avg_response_time_ms: 0.0,
3081                    error_rate: 0.0,
3082                    cpu_usage: 0.0,
3083                    memory_usage: 0,
3084                },
3085            },
3086            registered_at: SystemTime::now(),
3087            endpoint: MCPEndpoint {
3088                protocol: "p2p".to_string(),
3089                address: "".to_string(),
3090                port: None,
3091                tls: false,
3092                auth_required: false,
3093            },
3094        }
3095    }
3096}
3097
3098impl Default for MCPCapabilities {
3099    fn default() -> Self {
3100        Self {
3101            experimental: None,
3102            sampling: None,
3103            tools: Some(MCPToolsCapability {
3104                list_changed: Some(true),
3105            }),
3106            prompts: Some(MCPPromptsCapability {
3107                list_changed: Some(true),
3108            }),
3109            resources: Some(MCPResourcesCapability {
3110                subscribe: Some(true),
3111                list_changed: Some(true),
3112            }),
3113            logging: Some(MCPLoggingCapability {
3114                levels: Some(vec![
3115                    MCPLogLevel::Debug,
3116                    MCPLogLevel::Info,
3117                    MCPLogLevel::Warning,
3118                    MCPLogLevel::Error,
3119                ]),
3120            }),
3121        }
3122    }
3123}
3124
3125#[cfg(test)]
3126mod tests {
3127    use super::*;
3128    use crate::dht::DHT;
3129    use std::future::Future;
3130    use std::pin::Pin;
3131    use std::time::UNIX_EPOCH;
3132    use tokio::time::timeout;
3133
3134    /// Test implementation of ToolHandler for unit tests
3135    struct TestTool {
3136        name: String,
3137        should_error: bool,
3138        execution_time: Duration,
3139    }
3140
3141    impl TestTool {
3142        fn new(name: &str) -> Self {
3143            Self {
3144                name: name.to_string(),
3145                should_error: false,
3146                execution_time: Duration::from_millis(10),
3147            }
3148        }
3149
3150        fn with_error(mut self) -> Self {
3151            self.should_error = true;
3152            self
3153        }
3154
3155        fn with_execution_time(mut self, duration: Duration) -> Self {
3156            self.execution_time = duration;
3157            self
3158        }
3159    }
3160
3161    impl ToolHandler for TestTool {
3162        fn execute(
3163            &self,
3164            arguments: Value,
3165        ) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>> {
3166            let should_error = self.should_error;
3167            let execution_time = self.execution_time;
3168            let name = self.name.clone();
3169
3170            Box::pin(async move {
3171                tokio::time::sleep(execution_time).await;
3172
3173                if should_error {
3174                    return Err(P2PError::Mcp(crate::error::McpError::ToolExecutionFailed(
3175                        format!("{}: Test error", name).into(),
3176                    )));
3177                }
3178
3179                // Echo back the arguments with a response marker
3180                Ok(json!({
3181                    "tool": name,
3182                    "arguments": arguments,
3183                    "result": "success"
3184                }))
3185            })
3186        }
3187
3188        fn validate(&self, arguments: &Value) -> Result<()> {
3189            if !arguments.is_object() {
3190                return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
3191                    "Arguments must be an object".to_string().into(),
3192                )));
3193            }
3194            Ok(())
3195        }
3196
3197        fn get_requirements(&self) -> ToolRequirements {
3198            ToolRequirements {
3199                max_memory: Some(1024 * 1024), // 1MB
3200                max_execution_time: Some(Duration::from_secs(5)),
3201                required_capabilities: vec!["test".to_string()],
3202                requires_network: false,
3203                requires_filesystem: false,
3204            }
3205        }
3206    }
3207
3208    /// Helper function to create a test MCP server
3209    async fn create_test_mcp_server() -> MCPServer {
3210        let config = MCPServerConfig {
3211            server_name: "test_server".to_string(),
3212            server_version: "1.0.0".to_string(),
3213            enable_auth: false,
3214            enable_rate_limiting: false,
3215            max_concurrent_requests: 10,
3216            request_timeout: Duration::from_secs(30),
3217            enable_dht_discovery: true,
3218            rate_limit_rpm: 60,
3219            enable_logging: true,
3220            max_tool_execution_time: Duration::from_secs(30),
3221            tool_memory_limit: 100 * 1024 * 1024,
3222            health_monitor: HealthMonitorConfig::default(),
3223        };
3224
3225        MCPServer::new(config)
3226    }
3227
3228    /// Helper function to create a test tool
3229    fn create_test_tool(name: &str) -> Tool {
3230        Tool {
3231            definition: MCPTool {
3232                name: name.to_string(),
3233                description: format!("Test tool: {}", name).into(),
3234                input_schema: json!({
3235                    "type": "object",
3236                    "properties": {
3237                        "input": { "type": "string" }
3238                    }
3239                }),
3240            },
3241            handler: Box::new(TestTool::new(name)),
3242            metadata: ToolMetadata {
3243                created_at: SystemTime::now(),
3244                last_called: None,
3245                call_count: 0,
3246                avg_execution_time: Duration::from_millis(0),
3247                health_status: ToolHealthStatus::Healthy,
3248                tags: vec!["test".to_string()],
3249            },
3250        }
3251    }
3252
3253    /// Helper function to create a test DHT
3254    async fn create_test_dht() -> DHT {
3255        use crate::dht::core_engine::NodeId;
3256        let hash = blake3::hash(b"test_node_id");
3257        let local_id = NodeId::from_bytes(*hash.as_bytes());
3258        DHT::new(local_id).expect("Failed to create test DHT")
3259    }
3260
3261    /// Helper function to create an MCP call context
3262    fn create_test_context(caller_id: PeerId) -> MCPCallContext {
3263        MCPCallContext {
3264            caller_id,
3265            timestamp: SystemTime::now(),
3266            timeout: Duration::from_secs(30),
3267            auth_info: None,
3268            metadata: HashMap::new(),
3269        }
3270    }
3271
3272    #[tokio::test]
3273    async fn test_mcp_server_creation() {
3274        let server = create_test_mcp_server().await;
3275        assert_eq!(server.config.server_name, "test_server");
3276        assert_eq!(server.config.server_version, "1.0.0");
3277        assert!(!server.config.enable_auth);
3278        assert!(!server.config.enable_rate_limiting);
3279    }
3280
3281    #[tokio::test]
3282    async fn test_tool_registration() -> Result<()> {
3283        let server = create_test_mcp_server().await;
3284        let tool = create_test_tool("test_calculator");
3285
3286        // Register the tool
3287        server.register_tool(tool).await?;
3288
3289        // Verify tool is registered
3290        let tools = server.tools.read().await;
3291        assert!(tools.contains_key("test_calculator"));
3292        assert_eq!(
3293            tools
3294                .get("test_calculator")
3295                .expect("Should succeed in test")
3296                .definition
3297                .name,
3298            "test_calculator"
3299        );
3300
3301        // Verify stats updated
3302        let stats = server.stats.read().await;
3303        assert_eq!(stats.total_tools, 1);
3304
3305        Ok(())
3306    }
3307
3308    #[tokio::test]
3309    async fn test_tool_registration_duplicate() -> Result<()> {
3310        let server = create_test_mcp_server().await;
3311        let tool1 = create_test_tool("duplicate_tool");
3312        let tool2 = create_test_tool("duplicate_tool");
3313
3314        // Register first tool
3315        server.register_tool(tool1).await?;
3316
3317        // Try to register duplicate - should fail
3318        let result = server.register_tool(tool2).await;
3319        assert!(result.is_err());
3320        assert!(
3321            result
3322                .unwrap_err()
3323                .to_string()
3324                .contains("Tool already exists")
3325        );
3326
3327        Ok(())
3328    }
3329
3330    #[tokio::test]
3331    async fn test_tool_validation() {
3332        let server = create_test_mcp_server().await;
3333
3334        // Test invalid tool name (empty)
3335        let mut invalid_tool = create_test_tool("");
3336        let result = server.validate_tool(&invalid_tool).await;
3337        assert!(result.is_err());
3338
3339        // Test invalid tool name (too long)
3340        invalid_tool.definition.name = "a".repeat(200);
3341        let result = server.validate_tool(&invalid_tool).await;
3342        assert!(result.is_err());
3343
3344        // Test invalid schema (not an object)
3345        let mut invalid_schema_tool = create_test_tool("valid_name");
3346        invalid_schema_tool.definition.input_schema = json!("not an object");
3347        let result = server.validate_tool(&invalid_schema_tool).await;
3348        assert!(result.is_err());
3349
3350        // Test valid tool
3351        let valid_tool = create_test_tool("valid_tool");
3352        let result = server.validate_tool(&valid_tool).await;
3353        assert!(result.is_ok());
3354    }
3355
3356    #[tokio::test]
3357    async fn test_tool_call_success() -> Result<()> {
3358        let server = create_test_mcp_server().await;
3359        let tool = create_test_tool("success_tool");
3360        server.register_tool(tool).await?;
3361
3362        let caller_id = "test_peer_123".to_string();
3363        let context = create_test_context(caller_id);
3364        let arguments = json!({"input": "test data"});
3365
3366        let result = server
3367            .call_tool("success_tool", arguments.clone(), context)
3368            .await?;
3369
3370        // Verify response structure
3371        assert_eq!(result["tool"], "success_tool");
3372        assert_eq!(result["arguments"], arguments);
3373        assert_eq!(result["result"], "success");
3374
3375        // Verify tool metadata updated
3376        let tools = server.tools.read().await;
3377        let tool_metadata = &tools
3378            .get("success_tool")
3379            .ok_or_else(|| {
3380                P2PError::Mcp(crate::error::McpError::ToolNotFound(
3381                    "Tool not found".into(),
3382                ))
3383            })?
3384            .metadata;
3385        assert_eq!(tool_metadata.call_count, 1);
3386        assert!(tool_metadata.last_called.is_some());
3387
3388        Ok(())
3389    }
3390
3391    #[tokio::test]
3392    async fn test_tool_call_nonexistent() -> Result<()> {
3393        let server = create_test_mcp_server().await;
3394        let caller_id = "test_peer_456".to_string();
3395        let context = create_test_context(caller_id);
3396        let arguments = json!({"input": "test"});
3397
3398        let result = server
3399            .call_tool("nonexistent_tool", arguments, context)
3400            .await;
3401        assert!(result.is_err());
3402        assert!(result.unwrap_err().to_string().contains("Tool not found"));
3403
3404        Ok(())
3405    }
3406
3407    #[tokio::test]
3408    async fn test_tool_call_handler_error() -> Result<()> {
3409        let server = create_test_mcp_server().await;
3410        let tool = Tool {
3411            definition: MCPTool {
3412                name: "error_tool".to_string(),
3413                description: "Tool that always errors".to_string(),
3414                input_schema: json!({"type": "object"}),
3415            },
3416            handler: Box::new(TestTool::new("error_tool").with_error()),
3417            metadata: ToolMetadata {
3418                created_at: SystemTime::now(),
3419                last_called: None,
3420                call_count: 0,
3421                avg_execution_time: Duration::from_millis(0),
3422                health_status: ToolHealthStatus::Healthy,
3423                tags: vec![],
3424            },
3425        };
3426
3427        server.register_tool(tool).await?;
3428
3429        let caller_id = "test_peer_error".to_string();
3430        let context = create_test_context(caller_id);
3431        let arguments = json!({"input": "test"});
3432
3433        let result = server.call_tool("error_tool", arguments, context).await;
3434        assert!(result.is_err());
3435        assert!(
3436            result
3437                .unwrap_err()
3438                .to_string()
3439                .contains("Test error from tool error_tool")
3440        );
3441
3442        Ok(())
3443    }
3444
3445    #[tokio::test]
3446    async fn test_tool_call_timeout() -> Result<()> {
3447        let server = create_test_mcp_server().await;
3448        let slow_tool = Tool {
3449            definition: MCPTool {
3450                name: "slow_tool".to_string(),
3451                description: "Tool that takes too long".to_string(),
3452                input_schema: json!({"type": "object"}),
3453            },
3454            handler: Box::new(
3455                TestTool::new("slow_tool").with_execution_time(Duration::from_secs(2)),
3456            ),
3457            metadata: ToolMetadata {
3458                created_at: SystemTime::now(),
3459                last_called: None,
3460                call_count: 0,
3461                avg_execution_time: Duration::from_millis(0),
3462                health_status: ToolHealthStatus::Healthy,
3463                tags: vec![],
3464            },
3465        };
3466
3467        server.register_tool(slow_tool).await?;
3468
3469        let caller_id = "test_peer_error".to_string();
3470        let context = create_test_context(caller_id);
3471        let arguments = json!({"input": "test"});
3472
3473        // Test with very short timeout
3474        let result = timeout(
3475            Duration::from_millis(100),
3476            server.call_tool("slow_tool", arguments, context),
3477        )
3478        .await;
3479
3480        assert!(result.is_err()); // Should timeout
3481
3482        Ok(())
3483    }
3484
3485    #[tokio::test]
3486    async fn test_tool_requirements() {
3487        let tool = TestTool::new("req_tool");
3488        let requirements = tool.get_requirements();
3489
3490        assert_eq!(requirements.max_memory, Some(1024 * 1024));
3491        assert_eq!(
3492            requirements.max_execution_time,
3493            Some(Duration::from_secs(5))
3494        );
3495        assert_eq!(requirements.required_capabilities, vec!["test"]);
3496        assert!(!requirements.requires_network);
3497        assert!(!requirements.requires_filesystem);
3498    }
3499
3500    #[tokio::test]
3501    async fn test_tool_validation_handler() {
3502        let tool = TestTool::new("validation_tool");
3503
3504        // Valid arguments (object)
3505        let valid_args = json!({"key": "value"});
3506        assert!(tool.validate(&valid_args).is_ok());
3507
3508        // Invalid arguments (not an object)
3509        let invalid_args = json!("not an object");
3510        assert!(tool.validate(&invalid_args).is_err());
3511
3512        let invalid_args = json!(123);
3513        assert!(tool.validate(&invalid_args).is_err());
3514    }
3515
3516    #[tokio::test]
3517    async fn test_tool_health_status() {
3518        let mut metadata = ToolMetadata {
3519            created_at: SystemTime::now(),
3520            last_called: None,
3521            call_count: 0,
3522            avg_execution_time: Duration::from_millis(0),
3523            health_status: ToolHealthStatus::Healthy,
3524            tags: vec![],
3525        };
3526
3527        // Test different health statuses
3528        assert_eq!(metadata.health_status, ToolHealthStatus::Healthy);
3529
3530        metadata.health_status = ToolHealthStatus::Degraded;
3531        assert_eq!(metadata.health_status, ToolHealthStatus::Degraded);
3532
3533        metadata.health_status = ToolHealthStatus::Unhealthy;
3534        assert_eq!(metadata.health_status, ToolHealthStatus::Unhealthy);
3535
3536        metadata.health_status = ToolHealthStatus::Disabled;
3537        assert_eq!(metadata.health_status, ToolHealthStatus::Disabled);
3538    }
3539
3540    #[tokio::test]
3541    async fn test_mcp_capabilities() {
3542        let server = create_test_mcp_server().await;
3543        let capabilities = server.get_server_capabilities().await;
3544
3545        assert!(capabilities.tools.is_some());
3546        assert!(capabilities.prompts.is_some());
3547        assert!(capabilities.resources.is_some());
3548        assert!(capabilities.logging.is_some());
3549
3550        let tools_cap = capabilities.tools.expect("Test assertion failed");
3551        assert_eq!(tools_cap.list_changed, Some(true));
3552
3553        let logging_cap = capabilities.logging.expect("Test assertion failed");
3554        let levels = logging_cap.levels.expect("Test assertion failed");
3555        assert!(levels.contains(&MCPLogLevel::Debug));
3556        assert!(levels.contains(&MCPLogLevel::Info));
3557        assert!(levels.contains(&MCPLogLevel::Warning));
3558        assert!(levels.contains(&MCPLogLevel::Error));
3559    }
3560
3561    #[tokio::test]
3562    async fn test_mcp_message_serialization() {
3563        // Test Initialize message
3564        let init_msg = MCPMessage::Initialize {
3565            protocol_version: MCP_VERSION.to_string(),
3566            capabilities: MCPCapabilities {
3567                experimental: None,
3568                sampling: None,
3569                tools: Some(MCPToolsCapability {
3570                    list_changed: Some(true),
3571                }),
3572                prompts: None,
3573                resources: None,
3574                logging: None,
3575            },
3576            client_info: MCPClientInfo {
3577                name: "test_client".to_string(),
3578                version: "1.0.0".to_string(),
3579            },
3580        };
3581
3582        let serialized = serde_json::to_string(&init_msg).expect("Test assertion failed");
3583        let deserialized: MCPMessage =
3584            serde_json::from_str(&serialized).expect("Test assertion failed");
3585
3586        match deserialized {
3587            MCPMessage::Initialize {
3588                protocol_version,
3589                client_info,
3590                ..
3591            } => {
3592                assert_eq!(protocol_version, MCP_VERSION);
3593                assert_eq!(client_info.name, "test_client");
3594                assert_eq!(client_info.version, "1.0.0");
3595            }
3596            _ => panic!("Wrong message type after deserialization"),
3597        }
3598    }
3599
3600    #[tokio::test]
3601    async fn test_mcp_content_types() {
3602        // Test text content
3603        let text_content = MCPContent::Text {
3604            text: "Hello, world!".to_string(),
3605        };
3606
3607        let serialized = serde_json::to_string(&text_content).expect("Test assertion failed");
3608        let deserialized: MCPContent =
3609            serde_json::from_str(&serialized).expect("Test assertion failed");
3610
3611        match deserialized {
3612            MCPContent::Text { text } => assert_eq!(text, "Hello, world!"),
3613            _ => panic!("Wrong content type"),
3614        }
3615
3616        // Test image content
3617        let image_content = MCPContent::Image {
3618            data: "base64data".to_string(),
3619            mime_type: "image/png".to_string(),
3620        };
3621
3622        let serialized = serde_json::to_string(&image_content).expect("Test assertion failed");
3623        let deserialized: MCPContent =
3624            serde_json::from_str(&serialized).expect("Test assertion failed");
3625
3626        match deserialized {
3627            MCPContent::Image { data, mime_type } => {
3628                assert_eq!(data, "base64data");
3629                assert_eq!(mime_type, "image/png");
3630            }
3631            _ => panic!("Wrong content type"),
3632        }
3633    }
3634
3635    #[tokio::test]
3636    async fn test_service_health_status() {
3637        let mut metrics = ServiceLoadMetrics {
3638            active_requests: 0,
3639            requests_per_second: 0.0,
3640            avg_response_time_ms: 0.0,
3641            error_rate: 0.0,
3642            cpu_usage: 0.0,
3643            memory_usage: 0,
3644        };
3645
3646        // Test healthy service
3647        let metadata = MCPServiceMetadata {
3648            name: "test_service".to_string(),
3649            version: "1.0.0".to_string(),
3650            description: Some("Test service".to_string()),
3651            tags: vec!["test".to_string()],
3652            health_status: ServiceHealthStatus::Healthy,
3653            load_metrics: metrics.clone(),
3654        };
3655
3656        assert_eq!(metadata.health_status, ServiceHealthStatus::Healthy);
3657
3658        // Test different health statuses
3659        metrics.error_rate = 0.5; // 50% error rate
3660        let degraded_metadata = MCPServiceMetadata {
3661            health_status: ServiceHealthStatus::Degraded,
3662            load_metrics: metrics.clone(),
3663            ..metadata.clone()
3664        };
3665
3666        assert_eq!(
3667            degraded_metadata.health_status,
3668            ServiceHealthStatus::Degraded
3669        );
3670
3671        let unhealthy_metadata = MCPServiceMetadata {
3672            health_status: ServiceHealthStatus::Unhealthy,
3673            ..metadata.clone()
3674        };
3675
3676        assert_eq!(
3677            unhealthy_metadata.health_status,
3678            ServiceHealthStatus::Unhealthy
3679        );
3680    }
3681
3682    #[tokio::test]
3683    async fn test_p2p_mcp_message() {
3684        let source_peer = "source_peer_123".to_string();
3685        let target_peer = "target_peer_456".to_string();
3686
3687        let p2p_message = P2PMCPMessage {
3688            message_type: P2PMCPMessageType::Request,
3689            message_id: uuid::Uuid::new_v4().to_string(),
3690            source_peer: source_peer.clone(),
3691            target_peer: Some(target_peer.clone()),
3692            timestamp: SystemTime::now()
3693                .duration_since(UNIX_EPOCH)
3694                .unwrap_or_else(|_| Duration::from_secs(0))
3695                .as_secs(),
3696            payload: MCPMessage::ListTools { cursor: None },
3697            ttl: 10,
3698        };
3699
3700        // Test serialization
3701        let serialized = serde_json::to_string(&p2p_message).expect("Test assertion failed");
3702        let deserialized: P2PMCPMessage =
3703            serde_json::from_str(&serialized).expect("Test assertion failed");
3704
3705        assert_eq!(deserialized.message_type, P2PMCPMessageType::Request);
3706        assert_eq!(deserialized.source_peer, source_peer);
3707        assert_eq!(deserialized.target_peer, Some(target_peer));
3708        assert_eq!(deserialized.ttl, 10);
3709
3710        match deserialized.payload {
3711            MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
3712            _ => panic!("Wrong message payload type"),
3713        }
3714    }
3715
3716    #[tokio::test]
3717    async fn test_tool_requirements_default() {
3718        let default_requirements = ToolRequirements::default();
3719
3720        assert_eq!(default_requirements.max_memory, Some(100 * 1024 * 1024));
3721        assert_eq!(
3722            default_requirements.max_execution_time,
3723            Some(Duration::from_secs(30))
3724        );
3725        assert!(default_requirements.required_capabilities.is_empty());
3726        assert!(!default_requirements.requires_network);
3727        assert!(!default_requirements.requires_filesystem);
3728    }
3729
3730    #[tokio::test]
3731    async fn test_mcp_server_stats() -> Result<()> {
3732        let server = create_test_mcp_server().await;
3733
3734        // Initial stats should be zero
3735        let stats = server.stats.read().await;
3736        assert_eq!(stats.total_tools, 0);
3737        assert_eq!(stats.total_requests, 0);
3738        assert_eq!(stats.total_responses, 0);
3739        assert_eq!(stats.total_errors, 0);
3740
3741        drop(stats);
3742
3743        // Register a tool and verify stats update
3744        let tool = create_test_tool("stats_test_tool");
3745        server.register_tool(tool).await?;
3746
3747        let stats = server.stats.read().await;
3748        assert_eq!(stats.total_tools, 1);
3749
3750        Ok(())
3751    }
3752
3753    #[tokio::test]
3754    async fn test_log_levels() {
3755        // Test all log levels serialize/deserialize correctly
3756        let levels = vec![
3757            MCPLogLevel::Debug,
3758            MCPLogLevel::Info,
3759            MCPLogLevel::Notice,
3760            MCPLogLevel::Warning,
3761            MCPLogLevel::Error,
3762            MCPLogLevel::Critical,
3763            MCPLogLevel::Alert,
3764            MCPLogLevel::Emergency,
3765        ];
3766
3767        for level in levels {
3768            let serialized = serde_json::to_string(&level).expect("Test assertion failed");
3769            let deserialized: MCPLogLevel =
3770                serde_json::from_str(&serialized).expect("Test assertion failed");
3771            assert_eq!(level as u8, deserialized as u8);
3772        }
3773    }
3774
3775    #[tokio::test]
3776    async fn test_mcp_endpoint() {
3777        let endpoint = MCPEndpoint {
3778            protocol: "p2p".to_string(),
3779            address: "127.0.0.1".to_string(),
3780            port: Some(9000),
3781            tls: true,
3782            auth_required: true,
3783        };
3784
3785        let serialized = serde_json::to_string(&endpoint).expect("Test assertion failed");
3786        let deserialized: MCPEndpoint =
3787            serde_json::from_str(&serialized).expect("Test assertion failed");
3788
3789        assert_eq!(deserialized.protocol, "p2p");
3790        assert_eq!(deserialized.address, "127.0.0.1");
3791        assert_eq!(deserialized.port, Some(9000));
3792        assert!(deserialized.tls);
3793        assert!(deserialized.auth_required);
3794    }
3795
3796    #[tokio::test]
3797    async fn test_mcp_service_metadata() {
3798        let load_metrics = ServiceLoadMetrics {
3799            active_requests: 5,
3800            requests_per_second: 10.5,
3801            avg_response_time_ms: 250.0,
3802            error_rate: 0.01,
3803            cpu_usage: 45.5,
3804            memory_usage: 1024 * 1024 * 100, // 100MB
3805        };
3806
3807        let metadata = MCPServiceMetadata {
3808            name: "test_service".to_string(),
3809            version: "2.1.0".to_string(),
3810            description: Some("A test service for unit testing".to_string()),
3811            tags: vec!["test".to_string(), "unit".to_string(), "mcp".to_string()],
3812            health_status: ServiceHealthStatus::Healthy,
3813            load_metrics,
3814        };
3815
3816        // Test serialization
3817        let serialized = serde_json::to_string(&metadata).expect("Test assertion failed");
3818        let deserialized: MCPServiceMetadata =
3819            serde_json::from_str(&serialized).expect("Test assertion failed");
3820
3821        assert_eq!(deserialized.name, "test_service");
3822        assert_eq!(deserialized.version, "2.1.0");
3823        assert_eq!(
3824            deserialized.description,
3825            Some("A test service for unit testing".to_string())
3826        );
3827        assert_eq!(deserialized.tags, vec!["test", "unit", "mcp"]);
3828        assert_eq!(deserialized.health_status, ServiceHealthStatus::Healthy);
3829        assert_eq!(deserialized.load_metrics.active_requests, 5);
3830        assert_eq!(deserialized.load_metrics.requests_per_second, 10.5);
3831    }
3832
3833    #[tokio::test]
3834    async fn test_function_tool_handler() -> Result<()> {
3835        // Test function tool handler creation and execution
3836        let handler = FunctionToolHandler::new(|args: Value| async move {
3837            let name = args.get("name").and_then(|v| v.as_str()).unwrap_or("world");
3838            Ok(json!({"greeting": format!("Hello, {}!", name)}))
3839        });
3840
3841        let args = json!({"name": "Alice"});
3842        let result = handler.execute(args).await?;
3843        assert_eq!(result["greeting"], "Hello, Alice!");
3844
3845        // Test with missing argument
3846        let empty_args = json!({});
3847        let result = handler.execute(empty_args).await?;
3848        assert_eq!(result["greeting"], "Hello, world!");
3849
3850        Ok(())
3851    }
3852
3853    #[tokio::test]
3854    async fn test_mcp_service_creation() {
3855        let service_id = "test_service_123".to_string();
3856        let node_id = "test_node_789".to_string();
3857
3858        let service = MCPService::new(service_id.clone(), node_id.clone());
3859
3860        assert_eq!(service.service_id, service_id);
3861        assert_eq!(service.node_id, node_id);
3862        assert!(service.tools.is_empty());
3863        assert_eq!(service.metadata.name, "MCP Service");
3864        assert_eq!(service.metadata.version, "1.0.0");
3865        assert_eq!(service.metadata.health_status, ServiceHealthStatus::Healthy);
3866        assert_eq!(service.endpoint.protocol, "p2p");
3867        assert!(!service.endpoint.tls);
3868        assert!(!service.endpoint.auth_required);
3869    }
3870
3871    #[tokio::test]
3872    async fn test_mcp_capabilities_default() {
3873        let capabilities = MCPCapabilities::default();
3874
3875        assert!(capabilities.tools.is_some());
3876        assert!(capabilities.prompts.is_some());
3877        assert!(capabilities.resources.is_some());
3878        assert!(capabilities.logging.is_some());
3879
3880        let tools_cap = capabilities.tools.expect("Test assertion failed");
3881        assert_eq!(tools_cap.list_changed, Some(true));
3882
3883        let resources_cap = capabilities.resources.expect("Test assertion failed");
3884        assert_eq!(resources_cap.subscribe, Some(true));
3885        assert_eq!(resources_cap.list_changed, Some(true));
3886
3887        let logging_cap = capabilities.logging.expect("Test assertion failed");
3888        let levels = logging_cap.levels.expect("Test assertion failed");
3889        assert!(levels.contains(&MCPLogLevel::Debug));
3890        assert!(levels.contains(&MCPLogLevel::Info));
3891        assert!(levels.contains(&MCPLogLevel::Warning));
3892        assert!(levels.contains(&MCPLogLevel::Error));
3893    }
3894
3895    #[tokio::test]
3896    async fn test_mcp_request_creation() {
3897        let source_peer = "source_peer_123".to_string();
3898        let target_peer = "target_peer_456".to_string();
3899
3900        let request = MCPRequest {
3901            request_id: uuid::Uuid::new_v4().to_string(),
3902            source_peer: source_peer.clone(),
3903            target_peer: target_peer.clone(),
3904            message: MCPMessage::ListTools { cursor: None },
3905            timestamp: SystemTime::now(),
3906            timeout: Duration::from_secs(30),
3907            auth_token: Some("test_token".to_string()),
3908        };
3909
3910        assert_eq!(request.source_peer, source_peer);
3911        assert_eq!(request.target_peer, target_peer);
3912        assert_eq!(request.timeout, Duration::from_secs(30));
3913        assert_eq!(request.auth_token, Some("test_token".to_string()));
3914
3915        match request.message {
3916            MCPMessage::ListTools { cursor } => assert_eq!(cursor, None),
3917            _ => panic!("Wrong message type"),
3918        }
3919    }
3920
3921    #[tokio::test]
3922    async fn test_p2p_message_types() {
3923        // Test all P2P message types
3924        assert_eq!(P2PMCPMessageType::Request, P2PMCPMessageType::Request);
3925        assert_eq!(P2PMCPMessageType::Response, P2PMCPMessageType::Response);
3926        assert_eq!(
3927            P2PMCPMessageType::ServiceAdvertisement,
3928            P2PMCPMessageType::ServiceAdvertisement
3929        );
3930        assert_eq!(
3931            P2PMCPMessageType::ServiceDiscovery,
3932            P2PMCPMessageType::ServiceDiscovery
3933        );
3934
3935        // Test serialization of each type
3936        for msg_type in [
3937            P2PMCPMessageType::Request,
3938            P2PMCPMessageType::Response,
3939            P2PMCPMessageType::ServiceAdvertisement,
3940            P2PMCPMessageType::ServiceDiscovery,
3941        ] {
3942            let serialized = serde_json::to_string(&msg_type).expect("Test assertion failed");
3943            let deserialized: P2PMCPMessageType =
3944                serde_json::from_str(&serialized).expect("Test assertion failed");
3945            assert_eq!(msg_type, deserialized);
3946        }
3947    }
3948}