mcprs/
server.rs

1//! # Módulo de Servidor MCP
2//!
3//! Este módulo implementa um servidor HTTP para receber e processar mensagens MCP.
4//! Ele fornece duas versões do servidor: uma básica e uma avançada com autenticação
5//! e gerenciamento de conversas.
6//!
7//! ## Exemplo de Uso Básico
8//!
9//! ```rust,no_run
10//! use mcprs::agent::AgentRegistry;
11//! use mcprs::agent_openai::create_openai_agent;
12//! use mcprs::server::run_http_server;
13//! use std::net::SocketAddr;
14//!
15//! # async fn example() {
16//! // Configurar variável de ambiente
17//! std::env::set_var("OPENAI_API_KEY", "sua-chave-aqui");
18//!
19//! // Criar e configurar registro de agentes
20//! let mut registry = AgentRegistry::new();
21//! registry.register_agent(Box::new(create_openai_agent(None)));
22//!
23//! // Iniciar o servidor HTTP
24//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
25//! run_http_server(registry, addr).await;
26//! # }
27//! ```
28//!
29//! ## Exemplo de Uso Avançado
30//!
31//! ```rust,no_run
32//! use mcprs::agent::AgentRegistry;
33//! use mcprs::agent_openai::create_openai_agent;
34//! use mcprs::auth::AuthConfig;
35//! use mcprs::conversation::ConversationManager;
36//! use mcprs::server::run_http_server_with_auth;
37//! use std::net::SocketAddr;
38//!
39//! # async fn example() {
40//! // Configurar os componentes
41//! let mut registry = AgentRegistry::new();
42//! registry.register_agent(Box::new(create_openai_agent(None)));
43//!
44//! let auth_config = AuthConfig::new();
45//! auth_config.add_token("token-secreto".to_string());
46//!
47//! let conversation_manager = ConversationManager::new(24);
48//!
49//! // Iniciar o servidor avançado
50//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
51//! run_http_server_with_auth(registry, auth_config, conversation_manager, addr).await;
52//! # }
53//! ```
54
55use axum::{
56    extract::Json,
57    http::StatusCode,
58    response::{
59        sse::{Event, Sse},
60        IntoResponse, Response,
61    },
62    routing::{get, post},
63    Extension, Router,
64};
65use futures::Stream;
66use serde_json::json;
67use std::convert::Infallible;
68use std::net::SocketAddr;
69use std::sync::Arc;
70use tokio::sync::RwLock;
71use tokio_stream::wrappers::ReceiverStream;
72use tracing::{error, info};
73use tracing_subscriber;
74
75use crate::agent::{AgentRegistry, MCPError, MCPMessage};
76use crate::auth::AuthConfig;
77use crate::conversation::ConversationManager;
78
79/// Estado compartilhado da aplicação no servidor.
80///
81/// Esta estrutura mantém referências a todos os componentes principais
82/// do servidor MCP e é compartilhada entre manipuladores de requisições.
83#[derive(Clone)]
84pub struct AppState {
85    /// Registro de agentes para roteamento de mensagens
86    registry: Arc<RwLock<AgentRegistry>>,
87
88    /// Configuração de autenticação (opcional)
89    #[allow(dead_code)]
90    auth_config: Option<AuthConfig>,
91
92    /// Gerenciador de conversas (opcional)
93    conversation_manager: Option<Arc<ConversationManager>>,
94}
95
96/// Estrutura para representar uma resposta de erro em JSON.
97#[derive(serde::Serialize, serde::Deserialize)]
98struct ErrorResponse {
99    /// Mensagem de erro
100    error: String,
101}
102
103/// Converte um MCPError em uma resposta HTTP.
104impl IntoResponse for MCPError {
105    fn into_response(self) -> Response {
106        let body = Json(ErrorResponse {
107            error: self.to_string(),
108        });
109        (StatusCode::BAD_REQUEST, body).into_response()
110    }
111}
112
113/// Inicia e executa o servidor HTTP MCP básico.
114///
115/// Esta é a versão mais simples do servidor, sem autenticação ou
116/// gerenciamento de conversas. Útil para testes e integração inicial.
117///
118/// # Argumentos
119/// * `registry` - O registro de agentes para processar mensagens
120/// * `addr` - O endereço e porta onde o servidor deve escutar
121///
122/// # Exemplo
123///
124/// ```rust,no_run
125/// use mcprs::agent::AgentRegistry;
126/// use mcprs::server::run_http_server;
127/// use std::net::SocketAddr;
128///
129/// # async fn example() {
130/// let registry = AgentRegistry::new();
131/// let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
132/// run_http_server(registry, addr).await;
133/// # }
134/// ```
135pub async fn run_http_server(registry: AgentRegistry, addr: SocketAddr) {
136    // Inicializa o logging.
137    tracing_subscriber::fmt::init();
138
139    // Criar AppState sem autenticação nem gerenciamento de conversa
140    let app_state = AppState {
141        registry: Arc::new(RwLock::new(registry)),
142        auth_config: None,
143        conversation_manager: None,
144    };
145
146    // Configura o roteador com a rota /mcp para requisições POST.
147    let app = Router::new()
148        .route("/mcp", post(handle_mcp))
149        .route("/health", get(|| async { "OK" }))
150        .with_state(app_state);
151
152    info!("Servidor MCP rodando em {}", addr);
153
154    axum::Server::bind(&addr)
155        .serve(app.into_make_service())
156        .await
157        .unwrap();
158}
159
160/// Inicia e executa o servidor HTTP MCP avançado com autenticação e gestão de conversas.
161///
162/// Esta versão do servidor inclui:
163/// - Autenticação via token Bearer
164/// - Gerenciamento de histórico de conversas
165/// - Suporte para streaming de respostas
166/// - Endpoints adicionais para gerenciar conversações
167///
168/// # Argumentos
169/// * `registry` - O registro de agentes para processar mensagens
170/// * `auth_config` - Configuração de autenticação
171/// * `conversation_manager` - Gerenciador de histórico de conversas
172/// * `addr` - O endereço e porta onde o servidor deve escutar
173///
174/// # Exemplo
175///
176/// ```rust,no_run
177/// use mcprs::agent::AgentRegistry;
178/// use mcprs::auth::AuthConfig;
179/// use mcprs::conversation::ConversationManager;
180/// use mcprs::server::run_http_server_with_auth;
181/// use std::net::SocketAddr;
182///
183/// # async fn example() {
184/// let registry = AgentRegistry::new();
185/// let auth_config = AuthConfig::new();
186/// let conversation_manager = ConversationManager::new(24);
187/// let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
188///
189/// run_http_server_with_auth(registry, auth_config, conversation_manager, addr).await;
190/// # }
191/// ```
192pub async fn run_http_server_with_auth(
193    registry: AgentRegistry,
194    auth_config: AuthConfig,
195    conversation_manager: ConversationManager,
196    addr: SocketAddr,
197) {
198    // Inicializa o logging.
199    tracing_subscriber::fmt::init();
200
201    let app_state = AppState {
202        registry: Arc::new(RwLock::new(registry)),
203        auth_config: Some(auth_config.clone()),
204        conversation_manager: Some(Arc::new(conversation_manager)),
205    };
206
207    // Configura as rotas
208    let app = Router::new()
209        .route("/mcp", post(handle_mcp))
210        .route("/mcp/stream", get(handle_stream_mcp))
211        .route("/conversation", post(create_conversation))
212        .route("/conversation/:id", get(get_conversation))
213        .route("/health", get(|| async { "OK" }))
214        .with_state(app_state)
215        .layer(Extension(auth_config));
216
217    info!("Servidor MCP avançado rodando em {}", addr);
218
219    axum::Server::bind(&addr)
220        .serve(app.into_make_service())
221        .await
222        .unwrap();
223}
224
225/// Handler para a rota /mcp.
226///
227/// Este handler recebe uma requisição POST com uma MCPMessage,
228/// valida-a, e a encaminha para o agente apropriado.
229///
230/// # Argumentos
231/// * `state` - O estado compartilhado da aplicação
232/// * `payload` - A mensagem MCP recebida no corpo da requisição
233///
234/// # Retorna
235/// * `Ok(Json<MCPMessage>)` - A resposta do agente
236/// * `Err(MCPError)` - Se ocorrer um erro no processamento
237async fn handle_mcp(
238    axum::extract::State(state): axum::extract::State<AppState>,
239    Json(payload): Json<MCPMessage>,
240) -> Result<Json<MCPMessage>, MCPError> {
241    // Validação do campo magic.
242    if payload.magic != "MCP0" {
243        error!("Magic inválido: {}", payload.magic);
244        return Ok(Json(MCPMessage::new(
245            "error",
246            json!({"message": "Magic inválido"}),
247        )));
248    }
249
250    // Processa a mensagem utilizando o registro de agentes.
251    let response = {
252        let reg = state.registry.read().await;
253        reg.process(payload).await?
254    };
255
256    Ok(Json(response))
257}
258
259/// Handler para o endpoint de streaming /mcp/stream.
260///
261/// Este handler é semelhante ao `handle_mcp`, mas retorna a resposta
262/// como um stream de eventos (Server-Sent Events).
263///
264/// # Argumentos
265/// * `state` - O estado compartilhado da aplicação
266/// * `payload` - A mensagem MCP recebida no corpo da requisição
267///
268/// # Retorna
269/// Um stream de eventos SSE com a resposta
270async fn handle_stream_mcp(
271    axum::extract::State(state): axum::extract::State<AppState>,
272    Json(payload): Json<MCPMessage>,
273) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
274    let (tx, rx) = tokio::sync::mpsc::channel(100);
275
276    // Inicia o processamento em uma task separada
277    tokio::spawn(async move {
278        // Validação do campo magic
279        if payload.magic != "MCP0" {
280            let _ = tx
281                .send(Ok(Event::default().data("Error: Invalid magic")))
282                .await;
283            return;
284        }
285
286        // Processa a mensagem e envia resultados para o stream
287        let reg = state.registry.read().await;
288        match reg.process(payload).await {
289            Ok(response) => {
290                let _ = tx
291                    .send(Ok(
292                        Event::default().data(serde_json::to_string(&response).unwrap_or_default())
293                    ))
294                    .await;
295            }
296            Err(error) => {
297                let _ = tx
298                    .send(Ok(Event::default().data(format!("Error: {}", error))))
299                    .await;
300            }
301        }
302    });
303
304    Sse::new(ReceiverStream::new(rx))
305}
306
307/// Endpoint para criar uma nova conversa.
308///
309/// # Argumentos
310/// * `state` - O estado compartilhado da aplicação
311///
312/// # Retorna
313/// * No sucesso: Status 201 Created com ID da conversa
314/// * No erro: Status 500 Internal Server Error ou 501 Not Implemented
315async fn create_conversation(
316    axum::extract::State(state): axum::extract::State<AppState>,
317) -> impl IntoResponse {
318    if let Some(ref conversation_manager) = state.conversation_manager {
319        match conversation_manager.create_conversation() {
320            Ok(conversation) => (
321                StatusCode::CREATED,
322                Json(json!({
323                    "conversation_id": conversation.id,
324                    "created_at": conversation.created_at.elapsed().unwrap_or_default().as_secs()
325                })),
326            ),
327            Err(e) => (
328                StatusCode::INTERNAL_SERVER_ERROR,
329                Json(json!({ "error": e })),
330            ),
331        }
332    } else {
333        (
334            StatusCode::NOT_IMPLEMENTED,
335            Json(json!({ "error": "Gerenciamento de conversas não está habilitado" })),
336        )
337    }
338}
339
340/// Endpoint para obter uma conversa existente pelo ID.
341///
342/// # Argumentos
343/// * `state` - O estado compartilhado da aplicação
344/// * `id` - O ID da conversa a ser recuperada
345///
346/// # Retorna
347/// * No sucesso: Status 200 OK com dados da conversa
348/// * No erro: Status 404 Not Found ou 501 Not Implemented
349async fn get_conversation(
350    axum::extract::State(state): axum::extract::State<AppState>,
351    axum::extract::Path(id): axum::extract::Path<String>,
352) -> impl IntoResponse {
353    if let Some(ref conversation_manager) = state.conversation_manager {
354        match conversation_manager.get_conversation(&id) {
355            Some(conversation) => {
356                let messages: Vec<_> = conversation
357                    .messages
358                    .iter()
359                    .map(|msg| {
360                        json!({
361                            "role": msg.role,
362                            "content": msg.content,
363                            "timestamp": msg.timestamp.elapsed().unwrap_or_default().as_secs()
364                        })
365                    })
366                    .collect();
367
368                (
369                    StatusCode::OK,
370                    Json(json!({
371                        "conversation_id": conversation.id,
372                        "messages": messages,
373                        "metadata": conversation.metadata,
374                        "created_at": conversation.created_at.elapsed().unwrap_or_default().as_secs(),
375                        "updated_at": conversation.updated_at.elapsed().unwrap_or_default().as_secs()
376                    })),
377                )
378            }
379            None => (
380                StatusCode::NOT_FOUND,
381                Json(json!({ "error": "Conversa não encontrada" })),
382            ),
383        }
384    } else {
385        (
386            StatusCode::NOT_IMPLEMENTED,
387            Json(json!({ "error": "Gerenciamento de conversas não está habilitado" })),
388        )
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395    use crate::agent::DummyAgent;
396    use axum::body::Body;
397    use axum::http::{Request, StatusCode};
398    use serde_json::json;
399    use tower::ServiceExt;
400
401    async fn build_test_app() -> Router {
402        // Criar um registro com um agente dummy para testes
403        let mut registry = AgentRegistry::new();
404        registry.register_agent(Box::new(DummyAgent {
405            api_key: "test_key".to_string(),
406        }));
407
408        // Estado da aplicação para testes
409        let app_state = AppState {
410            registry: Arc::new(RwLock::new(registry)),
411            auth_config: None,
412            conversation_manager: None,
413        };
414
415        // Configurar roteador
416        Router::new()
417            .route("/mcp", post(handle_mcp))
418            .with_state(app_state)
419    }
420
421    #[tokio::test]
422    async fn test_handle_mcp_valid_request() {
423        // Construir app de teste
424        let app = build_test_app().await;
425
426        // Criar requisição de teste
427        let message = MCPMessage::new("dummy:test", json!({"test": "value"}));
428        let request = Request::builder()
429            .uri("/mcp")
430            .method("POST")
431            .header("Content-Type", "application/json")
432            .body(Body::from(serde_json::to_string(&message).unwrap()))
433            .unwrap();
434
435        // Enviar requisição e verificar resposta
436        let response = app.oneshot(request).await.unwrap();
437        assert_eq!(response.status(), StatusCode::OK);
438
439        // Verificar corpo da resposta
440        let body_bytes = hyper::body::to_bytes(response.into_body()).await.unwrap();
441        let response_message: MCPMessage = serde_json::from_slice(&body_bytes).unwrap();
442
443        assert_eq!(response_message.command, "dummy_response");
444        assert_eq!(response_message.payload, json!({"test": "value"}));
445    }
446
447    #[tokio::test]
448    async fn test_handle_mcp_invalid_magic() {
449        // Construir app de teste
450        let app = build_test_app().await;
451
452        // Criar requisição com magic inválido
453        let mut message = MCPMessage::new("dummy:test", json!({"test": "value"}));
454        message.magic = "INVALID".to_string();
455
456        let request = Request::builder()
457            .uri("/mcp")
458            .method("POST")
459            .header("Content-Type", "application/json")
460            .body(Body::from(serde_json::to_string(&message).unwrap()))
461            .unwrap();
462
463        // Enviar requisição e verificar resposta
464        let response = app.oneshot(request).await.unwrap();
465        assert_eq!(response.status(), StatusCode::OK);
466
467        // Verificar corpo da resposta
468        let body_bytes = hyper::body::to_bytes(response.into_body()).await.unwrap();
469        let response_message: MCPMessage = serde_json::from_slice(&body_bytes).unwrap();
470
471        assert_eq!(response_message.command, "error");
472        assert!(response_message.payload["message"]
473            .as_str()
474            .unwrap()
475            .contains("inválido"));
476    }
477
478    #[tokio::test]
479    async fn test_handle_mcp_agent_not_found() {
480        // Construir app de teste
481        let app = build_test_app().await;
482
483        // Criar requisição para agente inexistente
484        let message = MCPMessage::new("nonexistent:test", json!({"test": "value"}));
485        let request = Request::builder()
486            .uri("/mcp")
487            .method("POST")
488            .header("Content-Type", "application/json")
489            .body(Body::from(serde_json::to_string(&message).unwrap()))
490            .unwrap();
491
492        // Enviar requisição e verificar resposta
493        let response = app.oneshot(request).await.unwrap();
494        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
495
496        // Verificar corpo da resposta
497        let body_bytes = hyper::body::to_bytes(response.into_body()).await.unwrap();
498        let error_response: ErrorResponse = serde_json::from_slice(&body_bytes).unwrap();
499
500        assert!(error_response.error.contains("não foi encontrado"));
501    }
502}