pulseengine_mcp_server/
server.rs1use crate::{backend::McpBackend, handler::GenericServerHandler, middleware::MiddlewareStack};
4use pulseengine_mcp_auth::{AuthConfig, AuthenticationManager};
5use pulseengine_mcp_monitoring::{MetricsCollector, MonitoringConfig};
6use pulseengine_mcp_protocol::*;
7use pulseengine_mcp_security::{SecurityConfig, SecurityMiddleware};
8use pulseengine_mcp_transport::{Transport, TransportConfig};
9
10use std::sync::Arc;
11use thiserror::Error;
12use tokio::signal;
13use tracing::{error, info, warn};
14
15#[derive(Debug, Error)]
17pub enum ServerError {
18 #[error("Server configuration error: {0}")]
19 Configuration(String),
20
21 #[error("Transport error: {0}")]
22 Transport(String),
23
24 #[error("Authentication error: {0}")]
25 Authentication(String),
26
27 #[error("Backend error: {0}")]
28 Backend(String),
29
30 #[error("Server already running")]
31 AlreadyRunning,
32
33 #[error("Server not running")]
34 NotRunning,
35
36 #[error("Shutdown timeout")]
37 ShutdownTimeout,
38}
39
40#[derive(Debug, Clone)]
42pub struct ServerConfig {
43 pub server_info: ServerInfo,
45
46 pub auth_config: AuthConfig,
48
49 pub transport_config: TransportConfig,
51
52 pub security_config: SecurityConfig,
54
55 pub monitoring_config: MonitoringConfig,
57
58 pub graceful_shutdown: bool,
60
61 pub shutdown_timeout_secs: u64,
63}
64
65impl Default for ServerConfig {
66 fn default() -> Self {
67 Self {
68 server_info: ServerInfo {
69 protocol_version: ProtocolVersion::default(),
70 capabilities: ServerCapabilities::default(),
71 server_info: Implementation {
72 name: "MCP Server".to_string(),
73 version: "1.0.0".to_string(),
74 },
75 instructions: None,
76 },
77 auth_config: pulseengine_mcp_auth::default_config(),
78 transport_config: pulseengine_mcp_transport::TransportConfig::default(),
79 security_config: pulseengine_mcp_security::default_config(),
80 monitoring_config: pulseengine_mcp_monitoring::default_config(),
81 graceful_shutdown: true,
82 shutdown_timeout_secs: 30,
83 }
84 }
85}
86
87pub struct McpServer<B: McpBackend> {
89 backend: Arc<B>,
90 handler: GenericServerHandler<B>,
91 auth_manager: Arc<AuthenticationManager>,
92 transport: Box<dyn Transport>,
93 #[allow(dead_code)]
94 middleware_stack: MiddlewareStack,
95 metrics: Arc<MetricsCollector>,
96 config: ServerConfig,
97 running: Arc<tokio::sync::RwLock<bool>>,
98}
99
100impl<B: McpBackend + 'static> McpServer<B> {
101 pub async fn new(backend: B, config: ServerConfig) -> std::result::Result<Self, ServerError> {
103 info!("Initializing MCP server with backend");
104
105 let auth_manager = Arc::new(
107 AuthenticationManager::new(config.auth_config.clone())
108 .await
109 .map_err(|e| ServerError::Authentication(e.to_string()))?,
110 );
111
112 let transport =
114 pulseengine_mcp_transport::create_transport(config.transport_config.clone())
115 .map_err(|e| ServerError::Transport(e.to_string()))?;
116
117 let security_middleware = SecurityMiddleware::new(config.security_config.clone());
119
120 let metrics = Arc::new(MetricsCollector::new(config.monitoring_config.clone()));
122
123 let middleware_stack = MiddlewareStack::new()
125 .with_security(security_middleware)
126 .with_monitoring(metrics.clone())
127 .with_auth(auth_manager.clone());
128
129 let backend = Arc::new(backend);
131
132 let handler = GenericServerHandler::new(
134 backend.clone(),
135 auth_manager.clone(),
136 middleware_stack.clone(),
137 );
138
139 Ok(Self {
140 backend,
141 handler,
142 auth_manager,
143 transport,
144 middleware_stack,
145 metrics,
146 config,
147 running: Arc::new(tokio::sync::RwLock::new(false)),
148 })
149 }
150
151 pub async fn start(&mut self) -> std::result::Result<(), ServerError> {
153 {
154 let mut running = self.running.write().await;
155 if *running {
156 return Err(ServerError::AlreadyRunning);
157 }
158 *running = true;
159 }
160
161 info!("Starting MCP server");
162
163 self.backend
165 .on_startup()
166 .await
167 .map_err(|e| ServerError::Backend(e.to_string()))?;
168
169 self.auth_manager
171 .start_background_tasks()
172 .await
173 .map_err(|e| ServerError::Authentication(e.to_string()))?;
174
175 self.metrics.start_collection();
176
177 let handler = self.handler.clone();
179 self.transport
180 .start(Box::new(move |request| {
181 let handler = handler.clone();
182 Box::pin(async move {
183 match handler.handle_request(request).await {
184 Ok(response) => response,
185 Err(error) => Response {
186 jsonrpc: "2.0".to_string(),
187 id: serde_json::Value::Null,
188 result: None,
189 error: Some(error.into()),
190 },
191 }
192 })
193 }))
194 .await
195 .map_err(|e| ServerError::Transport(e.to_string()))?;
196
197 info!("MCP server started successfully");
198
199 if self.config.graceful_shutdown {
201 let running = self.running.clone();
202 tokio::spawn(async move {
203 signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
204 warn!("Shutdown signal received");
205 let mut running = running.write().await;
206 *running = false;
207 });
208 }
209
210 Ok(())
211 }
212
213 pub async fn stop(&mut self) -> std::result::Result<(), ServerError> {
215 {
216 let mut running = self.running.write().await;
217 if !*running {
218 return Err(ServerError::NotRunning);
219 }
220 *running = false;
221 }
222
223 info!("Stopping MCP server");
224
225 self.transport
227 .stop()
228 .await
229 .map_err(|e| ServerError::Transport(e.to_string()))?;
230
231 self.metrics.stop_collection();
233
234 self.auth_manager
235 .stop_background_tasks()
236 .await
237 .map_err(|e| ServerError::Authentication(e.to_string()))?;
238
239 self.backend
241 .on_shutdown()
242 .await
243 .map_err(|e| ServerError::Backend(e.to_string()))?;
244
245 info!("MCP server stopped");
246 Ok(())
247 }
248
249 pub async fn run(&mut self) -> std::result::Result<(), ServerError> {
251 self.start().await?;
252
253 loop {
255 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
256
257 let running = self.running.read().await;
258 if !*running {
259 break;
260 }
261 }
262
263 self.stop().await?;
264 Ok(())
265 }
266
267 pub async fn health_check(&self) -> std::result::Result<HealthStatus, ServerError> {
269 let backend_healthy = self.backend.health_check().await.is_ok();
271
272 let transport_healthy = self.transport.health_check().await.is_ok();
274
275 let auth_healthy = self.auth_manager.health_check().await.is_ok();
277
278 let overall_healthy = backend_healthy && transport_healthy && auth_healthy;
279
280 Ok(HealthStatus {
281 status: if overall_healthy {
282 "healthy".to_string()
283 } else {
284 "unhealthy".to_string()
285 },
286 components: vec![
287 ("backend".to_string(), backend_healthy),
288 ("transport".to_string(), transport_healthy),
289 ("auth".to_string(), auth_healthy),
290 ]
291 .into_iter()
292 .collect(),
293 uptime_seconds: self.metrics.get_uptime_seconds(),
294 })
295 }
296
297 pub async fn get_metrics(&self) -> ServerMetrics {
299 self.metrics.get_current_metrics()
300 }
301
302 pub fn get_server_info(&self) -> &ServerInfo {
304 &self.config.server_info
305 }
306
307 pub async fn is_running(&self) -> bool {
309 *self.running.read().await
310 }
311}
312
313#[derive(Debug, serde::Serialize, serde::Deserialize)]
315pub struct HealthStatus {
316 pub status: String,
317 pub components: std::collections::HashMap<String, bool>,
318 pub uptime_seconds: u64,
319}
320
321pub use pulseengine_mcp_monitoring::ServerMetrics;