1use crate::{backend::McpBackend, handler::GenericServerHandler, middleware::MiddlewareStack};
4use pulseengine_mcp_auth::{AuthConfig, AuthenticationManager};
5use pulseengine_mcp_logging::{
6 AlertConfig, AlertManager, DashboardConfig, DashboardManager, PerformanceProfiler,
7 PersistenceConfig, ProfilingConfig, SanitizationConfig, StructuredLogger, TelemetryConfig,
8 TelemetryManager,
9};
10use pulseengine_mcp_monitoring::{MetricsCollector, MonitoringConfig};
11use pulseengine_mcp_protocol::*;
12use pulseengine_mcp_security::{SecurityConfig, SecurityMiddleware};
13use pulseengine_mcp_transport::{Transport, TransportConfig};
14
15use std::sync::Arc;
16use thiserror::Error;
17use tokio::signal;
18use tracing::{error, info, warn};
19
20#[derive(Debug, Error)]
22pub enum ServerError {
23 #[error("Server configuration error: {0}")]
24 Configuration(String),
25
26 #[error("Transport error: {0}")]
27 Transport(String),
28
29 #[error("Authentication error: {0}")]
30 Authentication(String),
31
32 #[error("Backend error: {0}")]
33 Backend(String),
34
35 #[error("Server already running")]
36 AlreadyRunning,
37
38 #[error("Server not running")]
39 NotRunning,
40
41 #[error("Shutdown timeout")]
42 ShutdownTimeout,
43}
44
45#[derive(Debug, Clone)]
47pub struct ServerConfig {
48 pub server_info: ServerInfo,
50
51 pub auth_config: AuthConfig,
53
54 pub transport_config: TransportConfig,
56
57 pub security_config: SecurityConfig,
59
60 pub monitoring_config: MonitoringConfig,
62
63 pub sanitization_config: SanitizationConfig,
65
66 pub persistence_config: Option<PersistenceConfig>,
68
69 pub telemetry_config: TelemetryConfig,
71
72 pub alert_config: AlertConfig,
74
75 pub dashboard_config: DashboardConfig,
77
78 pub profiling_config: ProfilingConfig,
80
81 pub graceful_shutdown: bool,
83
84 pub shutdown_timeout_secs: u64,
86}
87
88impl Default for ServerConfig {
89 fn default() -> Self {
90 Self {
91 server_info: ServerInfo {
92 protocol_version: ProtocolVersion::default(),
93 capabilities: ServerCapabilities::default(),
94 server_info: Implementation {
95 name: "MCP Server".to_string(),
96 version: "1.0.0".to_string(),
97 },
98 instructions: None,
99 },
100 auth_config: pulseengine_mcp_auth::default_config(),
101 transport_config: pulseengine_mcp_transport::TransportConfig::default(),
102 security_config: pulseengine_mcp_security::default_config(),
103 monitoring_config: pulseengine_mcp_monitoring::default_config(),
104 sanitization_config: SanitizationConfig::default(),
105 persistence_config: None,
106 telemetry_config: TelemetryConfig::default(),
107 alert_config: AlertConfig::default(),
108 dashboard_config: DashboardConfig::default(),
109 profiling_config: ProfilingConfig::default(),
110 graceful_shutdown: true,
111 shutdown_timeout_secs: 30,
112 }
113 }
114}
115
116pub struct McpServer<B: McpBackend> {
118 backend: Arc<B>,
119 handler: GenericServerHandler<B>,
120 auth_manager: Arc<AuthenticationManager>,
121 transport: Box<dyn Transport>,
122 #[allow(dead_code)]
123 middleware_stack: MiddlewareStack,
124 monitoring_metrics: Arc<MetricsCollector>,
125 #[allow(dead_code)]
126 logging_metrics: Arc<pulseengine_mcp_logging::MetricsCollector>,
127 #[allow(dead_code)]
128 logger: StructuredLogger,
129 telemetry: Option<TelemetryManager>,
130 alert_manager: Arc<AlertManager>,
131 dashboard_manager: Arc<DashboardManager>,
132 profiler: Option<Arc<PerformanceProfiler>>,
133 config: ServerConfig,
134 running: Arc<tokio::sync::RwLock<bool>>,
135}
136
137impl<B: McpBackend + 'static> McpServer<B> {
138 pub async fn new(backend: B, config: ServerConfig) -> std::result::Result<Self, ServerError> {
140 let logger = StructuredLogger::new();
142
143 info!("Initializing MCP server with backend");
144
145 let telemetry = if config.telemetry_config.enabled {
147 let mut telemetry_config = config.telemetry_config.clone();
148 telemetry_config.service_name = config.server_info.server_info.name.clone();
149 telemetry_config.service_version = config.server_info.server_info.version.clone();
150
151 Some(TelemetryManager::new(telemetry_config).await.map_err(|e| {
152 ServerError::Configuration(format!("Failed to initialize telemetry: {e}"))
153 })?)
154 } else {
155 None
156 };
157
158 let auth_manager = Arc::new(
160 AuthenticationManager::new(config.auth_config.clone())
161 .await
162 .map_err(|e| ServerError::Authentication(e.to_string()))?,
163 );
164
165 let transport =
167 pulseengine_mcp_transport::create_transport(config.transport_config.clone())
168 .map_err(|e| ServerError::Transport(e.to_string()))?;
169
170 let security_middleware = SecurityMiddleware::new(config.security_config.clone());
172
173 let monitoring_metrics = Arc::new(MetricsCollector::new(config.monitoring_config.clone()));
175
176 let logging_metrics = Arc::new(pulseengine_mcp_logging::MetricsCollector::new());
178 if let Some(persistence_config) = config.persistence_config.clone() {
179 logging_metrics
180 .enable_persistence(persistence_config.clone())
181 .await
182 .map_err(|e| {
183 ServerError::Configuration(format!(
184 "Failed to initialize metrics persistence: {e}"
185 ))
186 })?;
187 }
188 let middleware_stack = MiddlewareStack::new()
189 .with_security(security_middleware)
190 .with_monitoring(monitoring_metrics.clone())
191 .with_auth(auth_manager.clone());
192
193 let backend = Arc::new(backend);
195
196 let alert_manager = Arc::new(AlertManager::new(config.alert_config.clone()));
198
199 let dashboard_manager = Arc::new(DashboardManager::new(config.dashboard_config.clone()));
201
202 let profiler = if config.profiling_config.enabled {
204 Some(Arc::new(PerformanceProfiler::new(
205 config.profiling_config.clone(),
206 )))
207 } else {
208 None
209 };
210
211 let handler = GenericServerHandler::new(
213 backend.clone(),
214 auth_manager.clone(),
215 middleware_stack.clone(),
216 );
217
218 Ok(Self {
219 backend,
220 handler,
221 auth_manager,
222 transport,
223 middleware_stack,
224 monitoring_metrics,
225 logging_metrics,
226 logger,
227 telemetry,
228 alert_manager,
229 dashboard_manager,
230 profiler,
231 config,
232 running: Arc::new(tokio::sync::RwLock::new(false)),
233 })
234 }
235
236 #[tracing::instrument(skip(self))]
238 pub async fn start(&mut self) -> std::result::Result<(), ServerError> {
239 {
240 let mut running = self.running.write().await;
241 if *running {
242 return Err(ServerError::AlreadyRunning);
243 }
244 *running = true;
245 }
246
247 info!("Starting MCP server");
248
249 self.backend
251 .on_startup()
252 .await
253 .map_err(|e| ServerError::Backend(e.to_string()))?;
254
255 self.auth_manager
257 .start_background_tasks()
258 .await
259 .map_err(|e| ServerError::Authentication(e.to_string()))?;
260
261 self.alert_manager.start().await;
263
264 self.start_dashboard_metrics_update().await;
266
267 if let Some(profiler) = &self.profiler {
269 profiler
270 .start_session(
271 format!("server_session_{}", chrono::Utc::now().timestamp()),
272 pulseengine_mcp_logging::ProfilingSessionType::Continuous,
273 )
274 .await
275 .map_err(|e| {
276 ServerError::Configuration(format!("Failed to start profiling session: {e}"))
277 })?;
278 }
279
280 let handler = self.handler.clone();
285 self.transport
286 .start(Box::new(move |request| {
287 let handler = handler.clone();
288 Box::pin(async move {
289 match handler.handle_request(request).await {
290 Ok(response) => response,
291 Err(error) => Response {
292 jsonrpc: "2.0".to_string(),
293 id: serde_json::Value::Null,
294 result: None,
295 error: Some(error.into()),
296 },
297 }
298 })
299 }))
300 .await
301 .map_err(|e| ServerError::Transport(e.to_string()))?;
302
303 info!("MCP server started successfully");
304
305 if self.config.graceful_shutdown {
307 let running = self.running.clone();
308 tokio::spawn(async move {
309 signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
310 warn!("Shutdown signal received");
311 let mut running = running.write().await;
312 *running = false;
313 });
314 }
315
316 Ok(())
317 }
318
319 pub async fn stop(&mut self) -> std::result::Result<(), ServerError> {
321 {
322 let mut running = self.running.write().await;
323 if !*running {
324 return Err(ServerError::NotRunning);
325 }
326 *running = false;
327 }
328
329 info!("Stopping MCP server");
330
331 self.transport
333 .stop()
334 .await
335 .map_err(|e| ServerError::Transport(e.to_string()))?;
336
337 self.monitoring_metrics.stop_collection().await;
339
340 self.auth_manager
341 .stop_background_tasks()
342 .await
343 .map_err(|e| ServerError::Authentication(e.to_string()))?;
344
345 if let Some(profiler) = &self.profiler {
347 profiler.stop_session().await.map_err(|e| {
348 ServerError::Configuration(format!("Failed to stop profiling session: {e}"))
349 })?;
350 }
351
352 if let Some(telemetry) = &self.telemetry {
354 telemetry.shutdown().await.map_err(|e| {
355 ServerError::Configuration(format!("Failed to shutdown telemetry: {e}"))
356 })?;
357 }
358
359 self.backend
361 .on_shutdown()
362 .await
363 .map_err(|e| ServerError::Backend(e.to_string()))?;
364
365 info!("MCP server stopped");
366 Ok(())
367 }
368
369 pub async fn run(&mut self) -> std::result::Result<(), ServerError> {
371 self.start().await?;
372
373 loop {
375 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
376
377 let running = self.running.read().await;
378 if !*running {
379 break;
380 }
381 }
382
383 self.stop().await?;
384 Ok(())
385 }
386
387 pub async fn health_check(&self) -> std::result::Result<HealthStatus, ServerError> {
389 let backend_healthy = self.backend.health_check().await.is_ok();
391
392 let transport_healthy = self.transport.health_check().await.is_ok();
394
395 let auth_healthy = self.auth_manager.health_check().await.is_ok();
397
398 let overall_healthy = backend_healthy && transport_healthy && auth_healthy;
399
400 Ok(HealthStatus {
401 status: if overall_healthy {
402 "healthy".to_string()
403 } else {
404 "unhealthy".to_string()
405 },
406 components: vec![
407 ("backend".to_string(), backend_healthy),
408 ("transport".to_string(), transport_healthy),
409 ("auth".to_string(), auth_healthy),
410 ]
411 .into_iter()
412 .collect(),
413 uptime_seconds: self.monitoring_metrics.get_uptime_seconds(),
414 })
415 }
416
417 pub async fn get_metrics(&self) -> ServerMetrics {
419 self.monitoring_metrics.get_current_metrics().await
420 }
421
422 pub fn get_server_info(&self) -> &ServerInfo {
424 &self.config.server_info
425 }
426
427 pub async fn is_running(&self) -> bool {
429 *self.running.read().await
430 }
431
432 pub fn get_alert_manager(&self) -> Arc<AlertManager> {
434 self.alert_manager.clone()
435 }
436
437 pub fn get_dashboard_manager(&self) -> Arc<DashboardManager> {
439 self.dashboard_manager.clone()
440 }
441
442 pub fn get_profiler(&self) -> Option<Arc<PerformanceProfiler>> {
444 self.profiler.clone()
445 }
446
447 async fn start_dashboard_metrics_update(&self) {
449 if !self.config.dashboard_config.enabled {
450 return;
451 }
452
453 let logging_metrics = self.logging_metrics.clone();
454 let dashboard_manager = self.dashboard_manager.clone();
455 let refresh_interval = self.config.dashboard_config.refresh_interval_secs;
456
457 tokio::spawn(async move {
458 let mut interval =
459 tokio::time::interval(std::time::Duration::from_secs(refresh_interval));
460
461 loop {
462 interval.tick().await;
463
464 let metrics_snapshot = logging_metrics.get_metrics_snapshot().await;
466
467 dashboard_manager.update_metrics(metrics_snapshot).await;
469 }
470 });
471 }
472}
473
474#[derive(Debug, serde::Serialize, serde::Deserialize)]
476pub struct HealthStatus {
477 pub status: String,
478 pub components: std::collections::HashMap<String, bool>,
479 pub uptime_seconds: u64,
480}
481
482pub use pulseengine_mcp_monitoring::ServerMetrics;