mcprs/server.rs
1//! # Módulo de Servidor MCP
2//!
3//! Este módulo implementa um servidor HTTP para receber e processar mensagens MCP.
4//! Ele fornece duas versões do servidor: uma básica e uma avançada com autenticação
5//! e gerenciamento de conversas.
6//!
7//! ## Exemplo de Uso Básico
8//!
9//! ```rust,no_run
10//! use mcprs::agent::AgentRegistry;
11//! use mcprs::agent_openai::create_openai_agent;
12//! use mcprs::server::run_http_server;
13//! use std::net::SocketAddr;
14//!
15//! # async fn example() {
16//! // Configurar variável de ambiente
17//! std::env::set_var("OPENAI_API_KEY", "sua-chave-aqui");
18//!
19//! // Criar e configurar registro de agentes
20//! let mut registry = AgentRegistry::new();
21//! registry.register_agent(Box::new(create_openai_agent(None)));
22//!
23//! // Iniciar o servidor HTTP
24//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
25//! run_http_server(registry, addr).await;
26//! # }
27//! ```
28//!
29//! ## Exemplo de Uso Avançado
30//!
31//! ```rust,no_run
32//! use mcprs::agent::AgentRegistry;
33//! use mcprs::agent_openai::create_openai_agent;
34//! use mcprs::auth::AuthConfig;
35//! use mcprs::conversation::ConversationManager;
36//! use mcprs::server::run_http_server_with_auth;
37//! use std::net::SocketAddr;
38//!
39//! # async fn example() {
40//! // Configurar os componentes
41//! let mut registry = AgentRegistry::new();
42//! registry.register_agent(Box::new(create_openai_agent(None)));
43//!
44//! let auth_config = AuthConfig::new();
45//! auth_config.add_token("token-secreto".to_string());
46//!
47//! let conversation_manager = ConversationManager::new(24);
48//!
49//! // Iniciar o servidor avançado
50//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
51//! run_http_server_with_auth(registry, auth_config, conversation_manager, addr).await;
52//! # }
53//! ```
54
55use axum::{
56 extract::Json,
57 http::StatusCode,
58 response::{
59 sse::{Event, Sse},
60 IntoResponse, Response,
61 },
62 routing::{get, post},
63 Extension, Router,
64};
65use futures::Stream;
66use serde_json::json;
67use std::convert::Infallible;
68use std::net::SocketAddr;
69use std::sync::Arc;
70use tokio::sync::RwLock;
71use tokio_stream::wrappers::ReceiverStream;
72use tracing::{error, info};
73use tracing_subscriber;
74
75use crate::agent::{AgentRegistry, MCPError, MCPMessage};
76use crate::auth::AuthConfig;
77use crate::conversation::ConversationManager;
78
79/// Estado compartilhado da aplicação no servidor.
80///
81/// Esta estrutura mantém referências a todos os componentes principais
82/// do servidor MCP e é compartilhada entre manipuladores de requisições.
83#[derive(Clone)]
84pub struct AppState {
85 /// Registro de agentes para roteamento de mensagens
86 registry: Arc<RwLock<AgentRegistry>>,
87
88 /// Configuração de autenticação (opcional)
89 #[allow(dead_code)]
90 auth_config: Option<AuthConfig>,
91
92 /// Gerenciador de conversas (opcional)
93 conversation_manager: Option<Arc<ConversationManager>>,
94}
95
96/// Estrutura para representar uma resposta de erro em JSON.
97#[derive(serde::Serialize, serde::Deserialize)]
98struct ErrorResponse {
99 /// Mensagem de erro
100 error: String,
101}
102
103/// Converte um MCPError em uma resposta HTTP.
104impl IntoResponse for MCPError {
105 fn into_response(self) -> Response {
106 let body = Json(ErrorResponse {
107 error: self.to_string(),
108 });
109 (StatusCode::BAD_REQUEST, body).into_response()
110 }
111}
112
113/// Inicia e executa o servidor HTTP MCP básico.
114///
115/// Esta é a versão mais simples do servidor, sem autenticação ou
116/// gerenciamento de conversas. Útil para testes e integração inicial.
117///
118/// # Argumentos
119/// * `registry` - O registro de agentes para processar mensagens
120/// * `addr` - O endereço e porta onde o servidor deve escutar
121///
122/// # Exemplo
123///
124/// ```rust,no_run
125/// use mcprs::agent::AgentRegistry;
126/// use mcprs::server::run_http_server;
127/// use std::net::SocketAddr;
128///
129/// # async fn example() {
130/// let registry = AgentRegistry::new();
131/// let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
132/// run_http_server(registry, addr).await;
133/// # }
134/// ```
135pub async fn run_http_server(registry: AgentRegistry, addr: SocketAddr) {
136 // Inicializa o logging.
137 tracing_subscriber::fmt::init();
138
139 // Criar AppState sem autenticação nem gerenciamento de conversa
140 let app_state = AppState {
141 registry: Arc::new(RwLock::new(registry)),
142 auth_config: None,
143 conversation_manager: None,
144 };
145
146 // Configura o roteador com a rota /mcp para requisições POST.
147 let app = Router::new()
148 .route("/mcp", post(handle_mcp))
149 .route("/health", get(|| async { "OK" }))
150 .with_state(app_state);
151
152 info!("Servidor MCP rodando em {}", addr);
153
154 axum::Server::bind(&addr)
155 .serve(app.into_make_service())
156 .await
157 .unwrap();
158}
159
160/// Inicia e executa o servidor HTTP MCP avançado com autenticação e gestão de conversas.
161///
162/// Esta versão do servidor inclui:
163/// - Autenticação via token Bearer
164/// - Gerenciamento de histórico de conversas
165/// - Suporte para streaming de respostas
166/// - Endpoints adicionais para gerenciar conversações
167///
168/// # Argumentos
169/// * `registry` - O registro de agentes para processar mensagens
170/// * `auth_config` - Configuração de autenticação
171/// * `conversation_manager` - Gerenciador de histórico de conversas
172/// * `addr` - O endereço e porta onde o servidor deve escutar
173///
174/// # Exemplo
175///
176/// ```rust,no_run
177/// use mcprs::agent::AgentRegistry;
178/// use mcprs::auth::AuthConfig;
179/// use mcprs::conversation::ConversationManager;
180/// use mcprs::server::run_http_server_with_auth;
181/// use std::net::SocketAddr;
182///
183/// # async fn example() {
184/// let registry = AgentRegistry::new();
185/// let auth_config = AuthConfig::new();
186/// let conversation_manager = ConversationManager::new(24);
187/// let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
188///
189/// run_http_server_with_auth(registry, auth_config, conversation_manager, addr).await;
190/// # }
191/// ```
192pub async fn run_http_server_with_auth(
193 registry: AgentRegistry,
194 auth_config: AuthConfig,
195 conversation_manager: ConversationManager,
196 addr: SocketAddr,
197) {
198 // Inicializa o logging.
199 tracing_subscriber::fmt::init();
200
201 let app_state = AppState {
202 registry: Arc::new(RwLock::new(registry)),
203 auth_config: Some(auth_config.clone()),
204 conversation_manager: Some(Arc::new(conversation_manager)),
205 };
206
207 // Configura as rotas
208 let app = Router::new()
209 .route("/mcp", post(handle_mcp))
210 .route("/mcp/stream", get(handle_stream_mcp))
211 .route("/conversation", post(create_conversation))
212 .route("/conversation/:id", get(get_conversation))
213 .route("/health", get(|| async { "OK" }))
214 .with_state(app_state)
215 .layer(Extension(auth_config));
216
217 info!("Servidor MCP avançado rodando em {}", addr);
218
219 axum::Server::bind(&addr)
220 .serve(app.into_make_service())
221 .await
222 .unwrap();
223}
224
225/// Handler para a rota /mcp.
226///
227/// Este handler recebe uma requisição POST com uma MCPMessage,
228/// valida-a, e a encaminha para o agente apropriado.
229///
230/// # Argumentos
231/// * `state` - O estado compartilhado da aplicação
232/// * `payload` - A mensagem MCP recebida no corpo da requisição
233///
234/// # Retorna
235/// * `Ok(Json<MCPMessage>)` - A resposta do agente
236/// * `Err(MCPError)` - Se ocorrer um erro no processamento
237async fn handle_mcp(
238 axum::extract::State(state): axum::extract::State<AppState>,
239 Json(payload): Json<MCPMessage>,
240) -> Result<Json<MCPMessage>, MCPError> {
241 // Validação do campo magic.
242 if payload.magic != "MCP0" {
243 error!("Magic inválido: {}", payload.magic);
244 return Ok(Json(MCPMessage::new(
245 "error",
246 json!({"message": "Magic inválido"}),
247 )));
248 }
249
250 // Processa a mensagem utilizando o registro de agentes.
251 let response = {
252 let reg = state.registry.read().await;
253 reg.process(payload).await?
254 };
255
256 Ok(Json(response))
257}
258
259/// Handler para o endpoint de streaming /mcp/stream.
260///
261/// Este handler é semelhante ao `handle_mcp`, mas retorna a resposta
262/// como um stream de eventos (Server-Sent Events).
263///
264/// # Argumentos
265/// * `state` - O estado compartilhado da aplicação
266/// * `payload` - A mensagem MCP recebida no corpo da requisição
267///
268/// # Retorna
269/// Um stream de eventos SSE com a resposta
270async fn handle_stream_mcp(
271 axum::extract::State(state): axum::extract::State<AppState>,
272 Json(payload): Json<MCPMessage>,
273) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
274 let (tx, rx) = tokio::sync::mpsc::channel(100);
275
276 // Inicia o processamento em uma task separada
277 tokio::spawn(async move {
278 // Validação do campo magic
279 if payload.magic != "MCP0" {
280 let _ = tx
281 .send(Ok(Event::default().data("Error: Invalid magic")))
282 .await;
283 return;
284 }
285
286 // Processa a mensagem e envia resultados para o stream
287 let reg = state.registry.read().await;
288 match reg.process(payload).await {
289 Ok(response) => {
290 let _ = tx
291 .send(Ok(
292 Event::default().data(serde_json::to_string(&response).unwrap_or_default())
293 ))
294 .await;
295 }
296 Err(error) => {
297 let _ = tx
298 .send(Ok(Event::default().data(format!("Error: {}", error))))
299 .await;
300 }
301 }
302 });
303
304 Sse::new(ReceiverStream::new(rx))
305}
306
307/// Endpoint para criar uma nova conversa.
308///
309/// # Argumentos
310/// * `state` - O estado compartilhado da aplicação
311///
312/// # Retorna
313/// * No sucesso: Status 201 Created com ID da conversa
314/// * No erro: Status 500 Internal Server Error ou 501 Not Implemented
315async fn create_conversation(
316 axum::extract::State(state): axum::extract::State<AppState>,
317) -> impl IntoResponse {
318 if let Some(ref conversation_manager) = state.conversation_manager {
319 match conversation_manager.create_conversation() {
320 Ok(conversation) => (
321 StatusCode::CREATED,
322 Json(json!({
323 "conversation_id": conversation.id,
324 "created_at": conversation.created_at.elapsed().unwrap_or_default().as_secs()
325 })),
326 ),
327 Err(e) => (
328 StatusCode::INTERNAL_SERVER_ERROR,
329 Json(json!({ "error": e })),
330 ),
331 }
332 } else {
333 (
334 StatusCode::NOT_IMPLEMENTED,
335 Json(json!({ "error": "Gerenciamento de conversas não está habilitado" })),
336 )
337 }
338}
339
340/// Endpoint para obter uma conversa existente pelo ID.
341///
342/// # Argumentos
343/// * `state` - O estado compartilhado da aplicação
344/// * `id` - O ID da conversa a ser recuperada
345///
346/// # Retorna
347/// * No sucesso: Status 200 OK com dados da conversa
348/// * No erro: Status 404 Not Found ou 501 Not Implemented
349async fn get_conversation(
350 axum::extract::State(state): axum::extract::State<AppState>,
351 axum::extract::Path(id): axum::extract::Path<String>,
352) -> impl IntoResponse {
353 if let Some(ref conversation_manager) = state.conversation_manager {
354 match conversation_manager.get_conversation(&id) {
355 Some(conversation) => {
356 let messages: Vec<_> = conversation
357 .messages
358 .iter()
359 .map(|msg| {
360 json!({
361 "role": msg.role,
362 "content": msg.content,
363 "timestamp": msg.timestamp.elapsed().unwrap_or_default().as_secs()
364 })
365 })
366 .collect();
367
368 (
369 StatusCode::OK,
370 Json(json!({
371 "conversation_id": conversation.id,
372 "messages": messages,
373 "metadata": conversation.metadata,
374 "created_at": conversation.created_at.elapsed().unwrap_or_default().as_secs(),
375 "updated_at": conversation.updated_at.elapsed().unwrap_or_default().as_secs()
376 })),
377 )
378 }
379 None => (
380 StatusCode::NOT_FOUND,
381 Json(json!({ "error": "Conversa não encontrada" })),
382 ),
383 }
384 } else {
385 (
386 StatusCode::NOT_IMPLEMENTED,
387 Json(json!({ "error": "Gerenciamento de conversas não está habilitado" })),
388 )
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395 use crate::agent::DummyAgent;
396 use axum::body::Body;
397 use axum::http::{Request, StatusCode};
398 use serde_json::json;
399 use tower::ServiceExt;
400
401 async fn build_test_app() -> Router {
402 // Criar um registro com um agente dummy para testes
403 let mut registry = AgentRegistry::new();
404 registry.register_agent(Box::new(DummyAgent {
405 api_key: "test_key".to_string(),
406 }));
407
408 // Estado da aplicação para testes
409 let app_state = AppState {
410 registry: Arc::new(RwLock::new(registry)),
411 auth_config: None,
412 conversation_manager: None,
413 };
414
415 // Configurar roteador
416 Router::new()
417 .route("/mcp", post(handle_mcp))
418 .with_state(app_state)
419 }
420
421 #[tokio::test]
422 async fn test_handle_mcp_valid_request() {
423 // Construir app de teste
424 let app = build_test_app().await;
425
426 // Criar requisição de teste
427 let message = MCPMessage::new("dummy:test", json!({"test": "value"}));
428 let request = Request::builder()
429 .uri("/mcp")
430 .method("POST")
431 .header("Content-Type", "application/json")
432 .body(Body::from(serde_json::to_string(&message).unwrap()))
433 .unwrap();
434
435 // Enviar requisição e verificar resposta
436 let response = app.oneshot(request).await.unwrap();
437 assert_eq!(response.status(), StatusCode::OK);
438
439 // Verificar corpo da resposta
440 let body_bytes = hyper::body::to_bytes(response.into_body()).await.unwrap();
441 let response_message: MCPMessage = serde_json::from_slice(&body_bytes).unwrap();
442
443 assert_eq!(response_message.command, "dummy_response");
444 assert_eq!(response_message.payload, json!({"test": "value"}));
445 }
446
447 #[tokio::test]
448 async fn test_handle_mcp_invalid_magic() {
449 // Construir app de teste
450 let app = build_test_app().await;
451
452 // Criar requisição com magic inválido
453 let mut message = MCPMessage::new("dummy:test", json!({"test": "value"}));
454 message.magic = "INVALID".to_string();
455
456 let request = Request::builder()
457 .uri("/mcp")
458 .method("POST")
459 .header("Content-Type", "application/json")
460 .body(Body::from(serde_json::to_string(&message).unwrap()))
461 .unwrap();
462
463 // Enviar requisição e verificar resposta
464 let response = app.oneshot(request).await.unwrap();
465 assert_eq!(response.status(), StatusCode::OK);
466
467 // Verificar corpo da resposta
468 let body_bytes = hyper::body::to_bytes(response.into_body()).await.unwrap();
469 let response_message: MCPMessage = serde_json::from_slice(&body_bytes).unwrap();
470
471 assert_eq!(response_message.command, "error");
472 assert!(response_message.payload["message"]
473 .as_str()
474 .unwrap()
475 .contains("inválido"));
476 }
477
478 #[tokio::test]
479 async fn test_handle_mcp_agent_not_found() {
480 // Construir app de teste
481 let app = build_test_app().await;
482
483 // Criar requisição para agente inexistente
484 let message = MCPMessage::new("nonexistent:test", json!({"test": "value"}));
485 let request = Request::builder()
486 .uri("/mcp")
487 .method("POST")
488 .header("Content-Type", "application/json")
489 .body(Body::from(serde_json::to_string(&message).unwrap()))
490 .unwrap();
491
492 // Enviar requisição e verificar resposta
493 let response = app.oneshot(request).await.unwrap();
494 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
495
496 // Verificar corpo da resposta
497 let body_bytes = hyper::body::to_bytes(response.into_body()).await.unwrap();
498 let error_response: ErrorResponse = serde_json::from_slice(&body_bytes).unwrap();
499
500 assert!(error_response.error.contains("não foi encontrado"));
501 }
502}