pulseengine_mcp_server/
server.rs

1//! Generic MCP server implementation
2
3use crate::{backend::McpBackend, handler::GenericServerHandler, middleware::MiddlewareStack};
4use pulseengine_mcp_auth::{AuthConfig, AuthenticationManager};
5use pulseengine_mcp_monitoring::{MetricsCollector, MonitoringConfig};
6use pulseengine_mcp_protocol::*;
7use pulseengine_mcp_security::{SecurityConfig, SecurityMiddleware};
8use pulseengine_mcp_transport::{Transport, TransportConfig};
9
10use std::sync::Arc;
11use thiserror::Error;
12use tokio::signal;
13use tracing::{error, info, warn};
14
15/// Error type for server operations
16#[derive(Debug, Error)]
17pub enum ServerError {
18    #[error("Server configuration error: {0}")]
19    Configuration(String),
20
21    #[error("Transport error: {0}")]
22    Transport(String),
23
24    #[error("Authentication error: {0}")]
25    Authentication(String),
26
27    #[error("Backend error: {0}")]
28    Backend(String),
29
30    #[error("Server already running")]
31    AlreadyRunning,
32
33    #[error("Server not running")]
34    NotRunning,
35
36    #[error("Shutdown timeout")]
37    ShutdownTimeout,
38}
39
40/// Server configuration
41#[derive(Debug, Clone)]
42pub struct ServerConfig {
43    /// Server implementation information
44    pub server_info: ServerInfo,
45
46    /// Authentication configuration
47    pub auth_config: AuthConfig,
48
49    /// Transport configuration
50    pub transport_config: TransportConfig,
51
52    /// Security configuration
53    pub security_config: SecurityConfig,
54
55    /// Monitoring configuration
56    pub monitoring_config: MonitoringConfig,
57
58    /// Enable graceful shutdown
59    pub graceful_shutdown: bool,
60
61    /// Shutdown timeout in seconds
62    pub shutdown_timeout_secs: u64,
63}
64
65impl Default for ServerConfig {
66    fn default() -> Self {
67        Self {
68            server_info: ServerInfo {
69                protocol_version: ProtocolVersion::default(),
70                capabilities: ServerCapabilities::default(),
71                server_info: Implementation {
72                    name: "MCP Server".to_string(),
73                    version: "1.0.0".to_string(),
74                },
75                instructions: None,
76            },
77            auth_config: pulseengine_mcp_auth::default_config(),
78            transport_config: pulseengine_mcp_transport::TransportConfig::default(),
79            security_config: pulseengine_mcp_security::default_config(),
80            monitoring_config: pulseengine_mcp_monitoring::default_config(),
81            graceful_shutdown: true,
82            shutdown_timeout_secs: 30,
83        }
84    }
85}
86
87/// Generic MCP server with pluggable backend
88pub struct McpServer<B: McpBackend> {
89    backend: Arc<B>,
90    handler: GenericServerHandler<B>,
91    auth_manager: Arc<AuthenticationManager>,
92    transport: Box<dyn Transport>,
93    #[allow(dead_code)]
94    middleware_stack: MiddlewareStack,
95    metrics: Arc<MetricsCollector>,
96    config: ServerConfig,
97    running: Arc<tokio::sync::RwLock<bool>>,
98}
99
100impl<B: McpBackend + 'static> McpServer<B> {
101    /// Create a new MCP server with the given backend and configuration
102    pub async fn new(backend: B, config: ServerConfig) -> std::result::Result<Self, ServerError> {
103        info!("Initializing MCP server with backend");
104
105        // Initialize authentication
106        let auth_manager = Arc::new(
107            AuthenticationManager::new(config.auth_config.clone())
108                .await
109                .map_err(|e| ServerError::Authentication(e.to_string()))?,
110        );
111
112        // Initialize transport
113        let transport =
114            pulseengine_mcp_transport::create_transport(config.transport_config.clone())
115                .map_err(|e| ServerError::Transport(e.to_string()))?;
116
117        // Initialize security middleware
118        let security_middleware = SecurityMiddleware::new(config.security_config.clone());
119
120        // Initialize monitoring
121        let metrics = Arc::new(MetricsCollector::new(config.monitoring_config.clone()));
122
123        // Create middleware stack
124        let middleware_stack = MiddlewareStack::new()
125            .with_security(security_middleware)
126            .with_monitoring(metrics.clone())
127            .with_auth(auth_manager.clone());
128
129        // Create backend arc
130        let backend = Arc::new(backend);
131
132        // Create handler
133        let handler = GenericServerHandler::new(
134            backend.clone(),
135            auth_manager.clone(),
136            middleware_stack.clone(),
137        );
138
139        Ok(Self {
140            backend,
141            handler,
142            auth_manager,
143            transport,
144            middleware_stack,
145            metrics,
146            config,
147            running: Arc::new(tokio::sync::RwLock::new(false)),
148        })
149    }
150
151    /// Start the server
152    pub async fn start(&mut self) -> std::result::Result<(), ServerError> {
153        {
154            let mut running = self.running.write().await;
155            if *running {
156                return Err(ServerError::AlreadyRunning);
157            }
158            *running = true;
159        }
160
161        info!("Starting MCP server");
162
163        // Call backend startup hook
164        self.backend
165            .on_startup()
166            .await
167            .map_err(|e| ServerError::Backend(e.to_string()))?;
168
169        // Start background services
170        self.auth_manager
171            .start_background_tasks()
172            .await
173            .map_err(|e| ServerError::Authentication(e.to_string()))?;
174
175        self.metrics.start_collection();
176
177        // Start transport
178        let handler = self.handler.clone();
179        self.transport
180            .start(Box::new(move |request| {
181                let handler = handler.clone();
182                Box::pin(async move {
183                    match handler.handle_request(request).await {
184                        Ok(response) => response,
185                        Err(error) => Response {
186                            jsonrpc: "2.0".to_string(),
187                            id: serde_json::Value::Null,
188                            result: None,
189                            error: Some(error.into()),
190                        },
191                    }
192                })
193            }))
194            .await
195            .map_err(|e| ServerError::Transport(e.to_string()))?;
196
197        info!("MCP server started successfully");
198
199        // Setup graceful shutdown if enabled
200        if self.config.graceful_shutdown {
201            let running = self.running.clone();
202            tokio::spawn(async move {
203                signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
204                warn!("Shutdown signal received");
205                let mut running = running.write().await;
206                *running = false;
207            });
208        }
209
210        Ok(())
211    }
212
213    /// Stop the server gracefully
214    pub async fn stop(&mut self) -> std::result::Result<(), ServerError> {
215        {
216            let mut running = self.running.write().await;
217            if !*running {
218                return Err(ServerError::NotRunning);
219            }
220            *running = false;
221        }
222
223        info!("Stopping MCP server");
224
225        // Stop transport
226        self.transport
227            .stop()
228            .await
229            .map_err(|e| ServerError::Transport(e.to_string()))?;
230
231        // Stop background services
232        self.metrics.stop_collection();
233
234        self.auth_manager
235            .stop_background_tasks()
236            .await
237            .map_err(|e| ServerError::Authentication(e.to_string()))?;
238
239        // Call backend shutdown hook
240        self.backend
241            .on_shutdown()
242            .await
243            .map_err(|e| ServerError::Backend(e.to_string()))?;
244
245        info!("MCP server stopped");
246        Ok(())
247    }
248
249    /// Run the server until shutdown signal
250    pub async fn run(&mut self) -> std::result::Result<(), ServerError> {
251        self.start().await?;
252
253        // Wait for shutdown signal
254        loop {
255            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
256
257            let running = self.running.read().await;
258            if !*running {
259                break;
260            }
261        }
262
263        self.stop().await?;
264        Ok(())
265    }
266
267    /// Get server health status
268    pub async fn health_check(&self) -> std::result::Result<HealthStatus, ServerError> {
269        // Check backend health
270        let backend_healthy = self.backend.health_check().await.is_ok();
271
272        // Check transport health
273        let transport_healthy = self.transport.health_check().await.is_ok();
274
275        // Check auth health
276        let auth_healthy = self.auth_manager.health_check().await.is_ok();
277
278        let overall_healthy = backend_healthy && transport_healthy && auth_healthy;
279
280        Ok(HealthStatus {
281            status: if overall_healthy {
282                "healthy".to_string()
283            } else {
284                "unhealthy".to_string()
285            },
286            components: vec![
287                ("backend".to_string(), backend_healthy),
288                ("transport".to_string(), transport_healthy),
289                ("auth".to_string(), auth_healthy),
290            ]
291            .into_iter()
292            .collect(),
293            uptime_seconds: self.metrics.get_uptime_seconds(),
294        })
295    }
296
297    /// Get server metrics
298    pub async fn get_metrics(&self) -> ServerMetrics {
299        self.metrics.get_current_metrics()
300    }
301
302    /// Get server information
303    pub fn get_server_info(&self) -> &ServerInfo {
304        &self.config.server_info
305    }
306
307    /// Check if server is running
308    pub async fn is_running(&self) -> bool {
309        *self.running.read().await
310    }
311}
312
313/// Health status information
314#[derive(Debug, serde::Serialize, serde::Deserialize)]
315pub struct HealthStatus {
316    pub status: String,
317    pub components: std::collections::HashMap<String, bool>,
318    pub uptime_seconds: u64,
319}
320
321// Re-export monitoring metrics type
322pub use pulseengine_mcp_monitoring::ServerMetrics;