pulseengine_mcp_server/
server.rs

1//! Generic MCP server implementation
2
3use crate::{backend::McpBackend, handler::GenericServerHandler, middleware::MiddlewareStack};
4use pulseengine_mcp_auth::{AuthConfig, AuthenticationManager};
5use pulseengine_mcp_logging::{
6    AlertConfig, AlertManager, DashboardConfig, DashboardManager, PerformanceProfiler,
7    PersistenceConfig, ProfilingConfig, SanitizationConfig, StructuredLogger, TelemetryConfig,
8    TelemetryManager,
9};
10use pulseengine_mcp_monitoring::{MetricsCollector, MonitoringConfig};
11use pulseengine_mcp_protocol::*;
12use pulseengine_mcp_security::{SecurityConfig, SecurityMiddleware};
13use pulseengine_mcp_transport::{Transport, TransportConfig};
14
15use std::sync::Arc;
16use thiserror::Error;
17use tokio::signal;
18use tracing::{error, info, warn};
19
20/// Error type for server operations
21#[derive(Debug, Error)]
22pub enum ServerError {
23    #[error("Server configuration error: {0}")]
24    Configuration(String),
25
26    #[error("Transport error: {0}")]
27    Transport(String),
28
29    #[error("Authentication error: {0}")]
30    Authentication(String),
31
32    #[error("Backend error: {0}")]
33    Backend(String),
34
35    #[error("Server already running")]
36    AlreadyRunning,
37
38    #[error("Server not running")]
39    NotRunning,
40
41    #[error("Shutdown timeout")]
42    ShutdownTimeout,
43}
44
45/// Server configuration
46#[derive(Debug, Clone)]
47pub struct ServerConfig {
48    /// Server implementation information
49    pub server_info: ServerInfo,
50
51    /// Authentication configuration
52    pub auth_config: AuthConfig,
53
54    /// Transport configuration
55    pub transport_config: TransportConfig,
56
57    /// Security configuration
58    pub security_config: SecurityConfig,
59
60    /// Monitoring configuration
61    pub monitoring_config: MonitoringConfig,
62
63    /// Log sanitization configuration
64    pub sanitization_config: SanitizationConfig,
65
66    /// Metrics persistence configuration
67    pub persistence_config: Option<PersistenceConfig>,
68
69    /// Telemetry configuration
70    pub telemetry_config: TelemetryConfig,
71
72    /// Alert configuration
73    pub alert_config: AlertConfig,
74
75    /// Dashboard configuration
76    pub dashboard_config: DashboardConfig,
77
78    /// Profiling configuration
79    pub profiling_config: ProfilingConfig,
80
81    /// Enable graceful shutdown
82    pub graceful_shutdown: bool,
83
84    /// Shutdown timeout in seconds
85    pub shutdown_timeout_secs: u64,
86}
87
88impl Default for ServerConfig {
89    fn default() -> Self {
90        Self {
91            server_info: ServerInfo {
92                protocol_version: ProtocolVersion::default(),
93                capabilities: ServerCapabilities::default(),
94                server_info: Implementation {
95                    name: "MCP Server".to_string(),
96                    version: "1.0.0".to_string(),
97                },
98                instructions: None,
99            },
100            auth_config: pulseengine_mcp_auth::default_config(),
101            transport_config: pulseengine_mcp_transport::TransportConfig::default(),
102            security_config: pulseengine_mcp_security::default_config(),
103            monitoring_config: pulseengine_mcp_monitoring::default_config(),
104            sanitization_config: SanitizationConfig::default(),
105            persistence_config: None,
106            telemetry_config: TelemetryConfig::default(),
107            alert_config: AlertConfig::default(),
108            dashboard_config: DashboardConfig::default(),
109            profiling_config: ProfilingConfig::default(),
110            graceful_shutdown: true,
111            shutdown_timeout_secs: 30,
112        }
113    }
114}
115
116/// Generic MCP server with pluggable backend
117pub struct McpServer<B: McpBackend> {
118    backend: Arc<B>,
119    handler: GenericServerHandler<B>,
120    auth_manager: Arc<AuthenticationManager>,
121    transport: Box<dyn Transport>,
122    #[allow(dead_code)]
123    middleware_stack: MiddlewareStack,
124    monitoring_metrics: Arc<MetricsCollector>,
125    #[allow(dead_code)]
126    logging_metrics: Arc<pulseengine_mcp_logging::MetricsCollector>,
127    #[allow(dead_code)]
128    logger: StructuredLogger,
129    telemetry: Option<TelemetryManager>,
130    alert_manager: Arc<AlertManager>,
131    dashboard_manager: Arc<DashboardManager>,
132    profiler: Option<Arc<PerformanceProfiler>>,
133    config: ServerConfig,
134    running: Arc<tokio::sync::RwLock<bool>>,
135}
136
137impl<B: McpBackend + 'static> McpServer<B> {
138    /// Create a new MCP server with the given backend and configuration
139    pub async fn new(backend: B, config: ServerConfig) -> std::result::Result<Self, ServerError> {
140        // Initialize structured logging
141        let logger = StructuredLogger::new();
142
143        info!("Initializing MCP server with backend");
144
145        // Initialize telemetry
146        let telemetry = if config.telemetry_config.enabled {
147            let mut telemetry_config = config.telemetry_config.clone();
148            telemetry_config.service_name = config.server_info.server_info.name.clone();
149            telemetry_config.service_version = config.server_info.server_info.version.clone();
150
151            Some(TelemetryManager::new(telemetry_config).await.map_err(|e| {
152                ServerError::Configuration(format!("Failed to initialize telemetry: {e}"))
153            })?)
154        } else {
155            None
156        };
157
158        // Initialize authentication
159        let auth_manager = Arc::new(
160            AuthenticationManager::new(config.auth_config.clone())
161                .await
162                .map_err(|e| ServerError::Authentication(e.to_string()))?,
163        );
164
165        // Initialize transport
166        let transport =
167            pulseengine_mcp_transport::create_transport(config.transport_config.clone())
168                .map_err(|e| ServerError::Transport(e.to_string()))?;
169
170        // Initialize security middleware
171        let security_middleware = SecurityMiddleware::new(config.security_config.clone());
172
173        // Initialize monitoring
174        let monitoring_metrics = Arc::new(MetricsCollector::new(config.monitoring_config.clone()));
175
176        // Initialize logging metrics with optional persistence
177        let logging_metrics = Arc::new(pulseengine_mcp_logging::MetricsCollector::new());
178        if let Some(persistence_config) = config.persistence_config.clone() {
179            logging_metrics
180                .enable_persistence(persistence_config.clone())
181                .await
182                .map_err(|e| {
183                    ServerError::Configuration(format!(
184                        "Failed to initialize metrics persistence: {e}"
185                    ))
186                })?;
187        }
188        let middleware_stack = MiddlewareStack::new()
189            .with_security(security_middleware)
190            .with_monitoring(monitoring_metrics.clone())
191            .with_auth(auth_manager.clone());
192
193        // Create backend arc
194        let backend = Arc::new(backend);
195
196        // Initialize alert manager
197        let alert_manager = Arc::new(AlertManager::new(config.alert_config.clone()));
198
199        // Initialize dashboard manager
200        let dashboard_manager = Arc::new(DashboardManager::new(config.dashboard_config.clone()));
201
202        // Initialize profiler if enabled
203        let profiler = if config.profiling_config.enabled {
204            Some(Arc::new(PerformanceProfiler::new(
205                config.profiling_config.clone(),
206            )))
207        } else {
208            None
209        };
210
211        // Create handler
212        let handler = GenericServerHandler::new(
213            backend.clone(),
214            auth_manager.clone(),
215            middleware_stack.clone(),
216        );
217
218        Ok(Self {
219            backend,
220            handler,
221            auth_manager,
222            transport,
223            middleware_stack,
224            monitoring_metrics,
225            logging_metrics,
226            logger,
227            telemetry,
228            alert_manager,
229            dashboard_manager,
230            profiler,
231            config,
232            running: Arc::new(tokio::sync::RwLock::new(false)),
233        })
234    }
235
236    /// Start the server
237    #[tracing::instrument(skip(self))]
238    pub async fn start(&mut self) -> std::result::Result<(), ServerError> {
239        {
240            let mut running = self.running.write().await;
241            if *running {
242                return Err(ServerError::AlreadyRunning);
243            }
244            *running = true;
245        }
246
247        info!("Starting MCP server");
248
249        // Call backend startup hook
250        self.backend
251            .on_startup()
252            .await
253            .map_err(|e| ServerError::Backend(e.to_string()))?;
254
255        // Start background services
256        self.auth_manager
257            .start_background_tasks()
258            .await
259            .map_err(|e| ServerError::Authentication(e.to_string()))?;
260
261        // Start alert manager
262        self.alert_manager.start().await;
263
264        // Start dashboard manager with metrics updates
265        self.start_dashboard_metrics_update().await;
266
267        // Start profiler if enabled
268        if let Some(profiler) = &self.profiler {
269            profiler
270                .start_session(
271                    format!("server_session_{}", chrono::Utc::now().timestamp()),
272                    pulseengine_mcp_logging::ProfilingSessionType::Continuous,
273                )
274                .await
275                .map_err(|e| {
276                    ServerError::Configuration(format!("Failed to start profiling session: {e}"))
277                })?;
278        }
279
280        // Metrics persistence is now handled internally by the logging metrics collector
281        // No need for manual snapshot saving
282
283        // Start transport
284        let handler = self.handler.clone();
285        self.transport
286            .start(Box::new(move |request| {
287                let handler = handler.clone();
288                Box::pin(async move {
289                    match handler.handle_request(request).await {
290                        Ok(response) => response,
291                        Err(error) => Response {
292                            jsonrpc: "2.0".to_string(),
293                            id: serde_json::Value::Null,
294                            result: None,
295                            error: Some(error.into()),
296                        },
297                    }
298                })
299            }))
300            .await
301            .map_err(|e| ServerError::Transport(e.to_string()))?;
302
303        info!("MCP server started successfully");
304
305        // Setup graceful shutdown if enabled
306        if self.config.graceful_shutdown {
307            let running = self.running.clone();
308            tokio::spawn(async move {
309                signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
310                warn!("Shutdown signal received");
311                let mut running = running.write().await;
312                *running = false;
313            });
314        }
315
316        Ok(())
317    }
318
319    /// Stop the server gracefully
320    pub async fn stop(&mut self) -> std::result::Result<(), ServerError> {
321        {
322            let mut running = self.running.write().await;
323            if !*running {
324                return Err(ServerError::NotRunning);
325            }
326            *running = false;
327        }
328
329        info!("Stopping MCP server");
330
331        // Stop transport
332        self.transport
333            .stop()
334            .await
335            .map_err(|e| ServerError::Transport(e.to_string()))?;
336
337        // Stop background services
338        self.monitoring_metrics.stop_collection().await;
339
340        self.auth_manager
341            .stop_background_tasks()
342            .await
343            .map_err(|e| ServerError::Authentication(e.to_string()))?;
344
345        // Stop profiler if enabled
346        if let Some(profiler) = &self.profiler {
347            profiler.stop_session().await.map_err(|e| {
348                ServerError::Configuration(format!("Failed to stop profiling session: {e}"))
349            })?;
350        }
351
352        // Shutdown telemetry
353        if let Some(telemetry) = &self.telemetry {
354            telemetry.shutdown().await.map_err(|e| {
355                ServerError::Configuration(format!("Failed to shutdown telemetry: {e}"))
356            })?;
357        }
358
359        // Call backend shutdown hook
360        self.backend
361            .on_shutdown()
362            .await
363            .map_err(|e| ServerError::Backend(e.to_string()))?;
364
365        info!("MCP server stopped");
366        Ok(())
367    }
368
369    /// Run the server until shutdown signal
370    pub async fn run(&mut self) -> std::result::Result<(), ServerError> {
371        self.start().await?;
372
373        // Wait for shutdown signal
374        loop {
375            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
376
377            let running = self.running.read().await;
378            if !*running {
379                break;
380            }
381        }
382
383        self.stop().await?;
384        Ok(())
385    }
386
387    /// Get server health status
388    pub async fn health_check(&self) -> std::result::Result<HealthStatus, ServerError> {
389        // Check backend health
390        let backend_healthy = self.backend.health_check().await.is_ok();
391
392        // Check transport health
393        let transport_healthy = self.transport.health_check().await.is_ok();
394
395        // Check auth health
396        let auth_healthy = self.auth_manager.health_check().await.is_ok();
397
398        let overall_healthy = backend_healthy && transport_healthy && auth_healthy;
399
400        Ok(HealthStatus {
401            status: if overall_healthy {
402                "healthy".to_string()
403            } else {
404                "unhealthy".to_string()
405            },
406            components: vec![
407                ("backend".to_string(), backend_healthy),
408                ("transport".to_string(), transport_healthy),
409                ("auth".to_string(), auth_healthy),
410            ]
411            .into_iter()
412            .collect(),
413            uptime_seconds: self.monitoring_metrics.get_uptime_seconds(),
414        })
415    }
416
417    /// Get server metrics
418    pub async fn get_metrics(&self) -> ServerMetrics {
419        self.monitoring_metrics.get_current_metrics().await
420    }
421
422    /// Get server information
423    pub fn get_server_info(&self) -> &ServerInfo {
424        &self.config.server_info
425    }
426
427    /// Check if server is running
428    pub async fn is_running(&self) -> bool {
429        *self.running.read().await
430    }
431
432    /// Get alert manager
433    pub fn get_alert_manager(&self) -> Arc<AlertManager> {
434        self.alert_manager.clone()
435    }
436
437    /// Get dashboard manager
438    pub fn get_dashboard_manager(&self) -> Arc<DashboardManager> {
439        self.dashboard_manager.clone()
440    }
441
442    /// Get profiler
443    pub fn get_profiler(&self) -> Option<Arc<PerformanceProfiler>> {
444        self.profiler.clone()
445    }
446
447    /// Start dashboard metrics update loop
448    async fn start_dashboard_metrics_update(&self) {
449        if !self.config.dashboard_config.enabled {
450            return;
451        }
452
453        let logging_metrics = self.logging_metrics.clone();
454        let dashboard_manager = self.dashboard_manager.clone();
455        let refresh_interval = self.config.dashboard_config.refresh_interval_secs;
456
457        tokio::spawn(async move {
458            let mut interval =
459                tokio::time::interval(std::time::Duration::from_secs(refresh_interval));
460
461            loop {
462                interval.tick().await;
463
464                // Get current metrics snapshot
465                let metrics_snapshot = logging_metrics.get_metrics_snapshot().await;
466
467                // Update dashboard with new metrics
468                dashboard_manager.update_metrics(metrics_snapshot).await;
469            }
470        });
471    }
472}
473
474/// Health status information
475#[derive(Debug, serde::Serialize, serde::Deserialize)]
476pub struct HealthStatus {
477    pub status: String,
478    pub components: std::collections::HashMap<String, bool>,
479    pub uptime_seconds: u64,
480}
481
482// Re-export monitoring metrics type
483pub use pulseengine_mcp_monitoring::ServerMetrics;