Skip to main content

fastmcp_transport/
http.rs

1//! HTTP transport for FastMCP.
2//!
3//! This module provides HTTP-based transport for MCP servers, enabling
4//! web-based deployments without relying on stdio or WebSockets.
5//!
6//! # Modes
7//!
8//! The HTTP transport supports two modes:
9//!
10//! - **Stateless**: Each HTTP request contains a single JSON-RPC message and receives
11//!   a single response. No session state is maintained between requests.
12//!
13//! - **Streamable**: Long-lived connections using HTTP streaming (chunked transfer)
14//!   for bidirectional communication. Supports Server-Sent Events (SSE) for
15//!   server-to-client notifications.
16//!
17//! # Integration
18//!
19//! This transport is designed to integrate with any HTTP server framework.
20//! It provides:
21//!
22//! - [`HttpRequestHandler`]: Processes incoming HTTP requests containing JSON-RPC messages
23//! - [`HttpTransport`]: Full transport implementation for HTTP connections
24//! - [`StreamableHttpTransport`]: Streaming transport for long-lived connections
25//!
26//! # Example
27//!
28//! ```ignore
29//! use fastmcp_transport::http::{HttpRequestHandler, HttpRequest, HttpResponse};
30//!
31//! let handler = HttpRequestHandler::new();
32//!
33//! // In your HTTP server's request handler:
34//! fn handle_mcp_request(http_req: YourHttpRequest) -> YourHttpResponse {
35//!     let request = HttpRequest {
36//!         method: http_req.method(),
37//!         path: http_req.path(),
38//!         headers: http_req.headers(),
39//!         body: http_req.body(),
40//!     };
41//!
42//!     let mcp_response = handler.handle(&cx, request)?;
43//!
44//!     YourHttpResponse::new()
45//!         .status(mcp_response.status)
46//!         .header("Content-Type", &mcp_response.content_type)
47//!         .body(mcp_response.body)
48//! }
49//! ```
50
51use std::collections::{HashMap, VecDeque};
52use std::io::{Read, Write};
53use std::sync::{Arc, Mutex};
54use std::time::{Duration, Instant};
55
56use asupersync::Cx;
57use fastmcp_protocol::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse};
58
59use crate::{Codec, CodecError, Transport, TransportError};
60
61// =============================================================================
62// HTTP Request/Response Types
63// =============================================================================
64
65/// HTTP method.
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum HttpMethod {
68    Get,
69    Post,
70    Put,
71    Delete,
72    Options,
73    Head,
74    Patch,
75}
76
77impl HttpMethod {
78    /// Parses an HTTP method from a string.
79    #[must_use]
80    pub fn parse(s: &str) -> Option<Self> {
81        match s.to_uppercase().as_str() {
82            "GET" => Some(Self::Get),
83            "POST" => Some(Self::Post),
84            "PUT" => Some(Self::Put),
85            "DELETE" => Some(Self::Delete),
86            "OPTIONS" => Some(Self::Options),
87            "HEAD" => Some(Self::Head),
88            "PATCH" => Some(Self::Patch),
89            _ => None,
90        }
91    }
92
93    /// Returns the method as a string.
94    #[must_use]
95    pub fn as_str(&self) -> &'static str {
96        match self {
97            Self::Get => "GET",
98            Self::Post => "POST",
99            Self::Put => "PUT",
100            Self::Delete => "DELETE",
101            Self::Options => "OPTIONS",
102            Self::Head => "HEAD",
103            Self::Patch => "PATCH",
104        }
105    }
106}
107
108/// HTTP status code.
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub struct HttpStatus(pub u16);
111
112impl HttpStatus {
113    pub const OK: Self = Self(200);
114    pub const ACCEPTED: Self = Self(202);
115    pub const BAD_REQUEST: Self = Self(400);
116    pub const UNAUTHORIZED: Self = Self(401);
117    pub const FORBIDDEN: Self = Self(403);
118    pub const NOT_FOUND: Self = Self(404);
119    pub const METHOD_NOT_ALLOWED: Self = Self(405);
120    pub const INTERNAL_SERVER_ERROR: Self = Self(500);
121    pub const SERVICE_UNAVAILABLE: Self = Self(503);
122
123    /// Returns true if this is a success status (2xx).
124    #[must_use]
125    pub fn is_success(&self) -> bool {
126        (200..300).contains(&self.0)
127    }
128
129    /// Returns true if this is a client error (4xx).
130    #[must_use]
131    pub fn is_client_error(&self) -> bool {
132        (400..500).contains(&self.0)
133    }
134
135    /// Returns true if this is a server error (5xx).
136    #[must_use]
137    pub fn is_server_error(&self) -> bool {
138        (500..600).contains(&self.0)
139    }
140}
141
142/// Incoming HTTP request.
143#[derive(Debug, Clone)]
144pub struct HttpRequest {
145    /// HTTP method.
146    pub method: HttpMethod,
147    /// Request path (e.g., "/mcp/v1").
148    pub path: String,
149    /// Request headers.
150    pub headers: HashMap<String, String>,
151    /// Request body.
152    pub body: Vec<u8>,
153    /// Query parameters.
154    pub query: HashMap<String, String>,
155}
156
157impl HttpRequest {
158    /// Creates a new HTTP request.
159    #[must_use]
160    pub fn new(method: HttpMethod, path: impl Into<String>) -> Self {
161        Self {
162            method,
163            path: path.into(),
164            headers: HashMap::new(),
165            body: Vec::new(),
166            query: HashMap::new(),
167        }
168    }
169
170    /// Adds a header.
171    #[must_use]
172    pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
173        self.headers
174            .insert(name.into().to_lowercase(), value.into());
175        self
176    }
177
178    /// Sets the body.
179    #[must_use]
180    pub fn with_body(mut self, body: impl Into<Vec<u8>>) -> Self {
181        self.body = body.into();
182        self
183    }
184
185    /// Adds a query parameter.
186    #[must_use]
187    pub fn with_query(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
188        self.query.insert(name.into(), value.into());
189        self
190    }
191
192    /// Gets a header value (case-insensitive).
193    #[must_use]
194    pub fn header(&self, name: &str) -> Option<&str> {
195        self.headers.get(&name.to_lowercase()).map(String::as_str)
196    }
197
198    /// Gets the Content-Type header.
199    #[must_use]
200    pub fn content_type(&self) -> Option<&str> {
201        self.header("content-type")
202    }
203
204    /// Gets the Authorization header.
205    #[must_use]
206    pub fn authorization(&self) -> Option<&str> {
207        self.header("authorization")
208    }
209
210    /// Parses the body as JSON.
211    pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
212        serde_json::from_slice(&self.body)
213    }
214}
215
216/// Outgoing HTTP response.
217#[derive(Debug, Clone)]
218pub struct HttpResponse {
219    /// HTTP status code.
220    pub status: HttpStatus,
221    /// Response headers.
222    pub headers: HashMap<String, String>,
223    /// Response body.
224    pub body: Vec<u8>,
225}
226
227impl HttpResponse {
228    /// Creates a new HTTP response with the given status.
229    #[must_use]
230    pub fn new(status: HttpStatus) -> Self {
231        let mut headers = HashMap::new();
232        headers.insert("content-type".to_string(), "application/json".to_string());
233        Self {
234            status,
235            headers,
236            body: Vec::new(),
237        }
238    }
239
240    /// Creates a 200 OK response.
241    #[must_use]
242    pub fn ok() -> Self {
243        Self::new(HttpStatus::OK)
244    }
245
246    /// Creates a 400 Bad Request response.
247    #[must_use]
248    pub fn bad_request() -> Self {
249        Self::new(HttpStatus::BAD_REQUEST)
250    }
251
252    /// Creates a 500 Internal Server Error response.
253    #[must_use]
254    pub fn internal_error() -> Self {
255        Self::new(HttpStatus::INTERNAL_SERVER_ERROR)
256    }
257
258    /// Adds a header.
259    #[must_use]
260    pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
261        self.headers
262            .insert(name.into().to_lowercase(), value.into());
263        self
264    }
265
266    /// Sets the body.
267    #[must_use]
268    pub fn with_body(mut self, body: impl Into<Vec<u8>>) -> Self {
269        self.body = body.into();
270        self
271    }
272
273    /// Sets the body as JSON.
274    #[must_use]
275    pub fn with_json<T: serde::Serialize>(mut self, value: &T) -> Self {
276        self.body = serde_json::to_vec(value).unwrap_or_default();
277        self.headers
278            .insert("content-type".to_string(), "application/json".to_string());
279        self
280    }
281
282    /// Sets CORS headers for cross-origin requests.
283    #[must_use]
284    pub fn with_cors(mut self, origin: &str) -> Self {
285        self.headers.insert(
286            "access-control-allow-origin".to_string(),
287            origin.to_string(),
288        );
289        self.headers.insert(
290            "access-control-allow-methods".to_string(),
291            "GET, POST, OPTIONS".to_string(),
292        );
293        self.headers.insert(
294            "access-control-allow-headers".to_string(),
295            "Content-Type, Authorization".to_string(),
296        );
297        self
298    }
299}
300
301// =============================================================================
302// HTTP Error
303// =============================================================================
304
305/// HTTP transport error.
306#[derive(Debug)]
307pub enum HttpError {
308    /// Invalid HTTP method.
309    InvalidMethod(String),
310    /// Invalid Content-Type.
311    InvalidContentType(String),
312    /// HTTP headers exceeded the maximum allowed size.
313    HeadersTooLarge { size: usize, max: usize },
314    /// HTTP body exceeded the maximum allowed size.
315    BodyTooLarge { size: usize, max: usize },
316    /// Unsupported Transfer-Encoding.
317    UnsupportedTransferEncoding(String),
318    /// JSON parsing error.
319    JsonError(serde_json::Error),
320    /// Codec error.
321    CodecError(CodecError),
322    /// Request timeout.
323    Timeout,
324    /// Connection closed.
325    Closed,
326    /// Transport error.
327    Transport(TransportError),
328}
329
330impl std::fmt::Display for HttpError {
331    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332        match self {
333            Self::InvalidMethod(m) => write!(f, "invalid HTTP method: {}", m),
334            Self::InvalidContentType(ct) => write!(f, "invalid content type: {}", ct),
335            Self::HeadersTooLarge { size, max } => {
336                write!(f, "headers too large: {size} > {max} bytes")
337            }
338            Self::BodyTooLarge { size, max } => write!(f, "body too large: {size} > {max} bytes"),
339            Self::UnsupportedTransferEncoding(te) => {
340                write!(f, "unsupported transfer encoding: {}", te)
341            }
342            Self::JsonError(e) => write!(f, "JSON error: {}", e),
343            Self::CodecError(e) => write!(f, "codec error: {}", e),
344            Self::Timeout => write!(f, "request timeout"),
345            Self::Closed => write!(f, "connection closed"),
346            Self::Transport(e) => write!(f, "transport error: {}", e),
347        }
348    }
349}
350
351impl std::error::Error for HttpError {}
352
353impl From<serde_json::Error> for HttpError {
354    fn from(err: serde_json::Error) -> Self {
355        Self::JsonError(err)
356    }
357}
358
359impl From<CodecError> for HttpError {
360    fn from(err: CodecError) -> Self {
361        Self::CodecError(err)
362    }
363}
364
365impl From<TransportError> for HttpError {
366    fn from(err: TransportError) -> Self {
367        Self::Transport(err)
368    }
369}
370
371// =============================================================================
372// HTTP Request Handler
373// =============================================================================
374
375/// Configuration for the HTTP request handler.
376#[derive(Debug, Clone)]
377pub struct HttpHandlerConfig {
378    /// Base path for MCP endpoints (e.g., "/mcp/v1").
379    pub base_path: String,
380    /// Whether to allow CORS requests.
381    pub allow_cors: bool,
382    /// Allowed CORS origins ("*" for all).
383    pub cors_origins: Vec<String>,
384    /// Request timeout.
385    pub timeout: Duration,
386    /// Maximum request body size in bytes.
387    pub max_body_size: usize,
388}
389
390impl Default for HttpHandlerConfig {
391    fn default() -> Self {
392        Self {
393            base_path: "/mcp/v1".to_string(),
394            allow_cors: true,
395            cors_origins: vec!["*".to_string()],
396            timeout: Duration::from_secs(30),
397            max_body_size: 10 * 1024 * 1024, // 10 MB
398        }
399    }
400}
401
402/// Handles HTTP requests containing MCP JSON-RPC messages.
403///
404/// This handler is designed to be integrated with any HTTP server framework.
405/// It processes incoming HTTP requests, extracts JSON-RPC messages, and returns
406/// appropriate HTTP responses.
407pub struct HttpRequestHandler {
408    config: HttpHandlerConfig,
409    codec: Codec,
410}
411
412impl HttpRequestHandler {
413    /// Creates a new HTTP request handler with default configuration.
414    #[must_use]
415    pub fn new() -> Self {
416        Self::with_config(HttpHandlerConfig::default())
417    }
418
419    /// Creates a new HTTP request handler with the given configuration.
420    #[must_use]
421    pub fn with_config(config: HttpHandlerConfig) -> Self {
422        Self {
423            config,
424            codec: Codec::new(),
425        }
426    }
427
428    /// Returns the handler configuration.
429    #[must_use]
430    pub fn config(&self) -> &HttpHandlerConfig {
431        &self.config
432    }
433
434    /// Handles a CORS preflight OPTIONS request.
435    #[must_use]
436    pub fn handle_options(&self, request: &HttpRequest) -> HttpResponse {
437        if !self.config.allow_cors {
438            return HttpResponse::new(HttpStatus::METHOD_NOT_ALLOWED);
439        }
440
441        let origin = request.header("origin").unwrap_or("*");
442        let allowed = self.is_origin_allowed(origin);
443
444        if !allowed {
445            return HttpResponse::new(HttpStatus::FORBIDDEN);
446        }
447
448        HttpResponse::new(HttpStatus::OK)
449            .with_cors(origin)
450            .with_header("access-control-max-age", "86400")
451    }
452
453    /// Checks if the origin is allowed for CORS.
454    #[must_use]
455    pub fn is_origin_allowed(&self, origin: &str) -> bool {
456        self.config
457            .cors_origins
458            .iter()
459            .any(|o| o == "*" || o == origin)
460    }
461
462    /// Parses a JSON-RPC request from an HTTP request.
463    pub fn parse_request(&self, request: &HttpRequest) -> Result<JsonRpcRequest, HttpError> {
464        // Validate method
465        if request.method != HttpMethod::Post {
466            return Err(HttpError::InvalidMethod(
467                request.method.as_str().to_string(),
468            ));
469        }
470
471        // Validate content type
472        let content_type = request.content_type().unwrap_or("");
473        if !content_type.starts_with("application/json") {
474            return Err(HttpError::InvalidContentType(content_type.to_string()));
475        }
476
477        // Validate body size
478        if request.body.len() > self.config.max_body_size {
479            return Err(HttpError::BodyTooLarge {
480                size: request.body.len(),
481                max: self.config.max_body_size,
482            });
483        }
484
485        // Parse JSON-RPC request
486        let json_rpc: JsonRpcRequest = serde_json::from_slice(&request.body)?;
487        Ok(json_rpc)
488    }
489
490    /// Creates an HTTP response from a JSON-RPC response.
491    #[must_use]
492    pub fn create_response(
493        &self,
494        response: &JsonRpcResponse,
495        origin: Option<&str>,
496    ) -> HttpResponse {
497        let body = self.codec.encode_response(response).unwrap_or_default();
498
499        let mut http_response = HttpResponse::ok()
500            .with_body(body)
501            .with_header("content-type", "application/json");
502
503        if self.config.allow_cors {
504            if let Some(origin) = origin {
505                if self.is_origin_allowed(origin) {
506                    http_response = http_response.with_cors(origin);
507                }
508            }
509        }
510
511        http_response
512    }
513
514    /// Creates an error HTTP response.
515    #[must_use]
516    pub fn error_response(&self, status: HttpStatus, message: &str) -> HttpResponse {
517        let error = serde_json::json!({
518            "error": {
519                "code": -32600,
520                "message": message
521            }
522        });
523
524        HttpResponse::new(status).with_json(&error)
525    }
526}
527
528impl Default for HttpRequestHandler {
529    fn default() -> Self {
530        Self::new()
531    }
532}
533
534// =============================================================================
535// HTTP Transport
536// =============================================================================
537
538/// HTTP transport for stateless MCP communication.
539///
540/// In stateless mode, each HTTP request contains a single JSON-RPC message
541/// and receives a single response. This is suitable for simple integrations
542/// where session state is not needed.
543pub struct HttpTransport<R, W> {
544    reader: R,
545    writer: W,
546    codec: Codec,
547    closed: bool,
548    pending_responses: Vec<JsonRpcResponse>,
549}
550
551impl<R: Read, W: Write> HttpTransport<R, W> {
552    /// Creates a new HTTP transport.
553    #[must_use]
554    pub fn new(reader: R, writer: W) -> Self {
555        Self {
556            reader,
557            writer,
558            codec: Codec::new(),
559            closed: false,
560            pending_responses: Vec::new(),
561        }
562    }
563
564    /// Reads an HTTP request from the reader.
565    pub fn read_request(&mut self) -> Result<HttpRequest, HttpError> {
566        const MAX_HEADERS_SIZE: usize = 64 * 1024;
567        const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
568
569        let mut buffer = Vec::new();
570        let mut byte = [0u8; 1];
571
572        // Read headers until \r\n\r\n
573        loop {
574            if self
575                .reader
576                .read(&mut byte)
577                .map_err(|e| HttpError::Transport(e.into()))?
578                == 0
579            {
580                return Err(HttpError::Closed);
581            }
582            buffer.push(byte[0]);
583
584            if buffer.ends_with(b"\r\n\r\n") {
585                break;
586            }
587
588            // Prevent infinite loops
589            if buffer.len() > MAX_HEADERS_SIZE {
590                return Err(HttpError::HeadersTooLarge {
591                    size: buffer.len(),
592                    max: MAX_HEADERS_SIZE,
593                });
594            }
595        }
596
597        let header_str = String::from_utf8_lossy(&buffer);
598        let mut lines = header_str.lines();
599
600        // Parse request line
601        let request_line = lines
602            .next()
603            .ok_or_else(|| HttpError::InvalidMethod("missing request line".to_string()))?;
604
605        let parts: Vec<&str> = request_line.split_whitespace().collect();
606        if parts.len() < 2 {
607            return Err(HttpError::InvalidMethod("invalid request line".to_string()));
608        }
609
610        let method = HttpMethod::parse(parts[0])
611            .ok_or_else(|| HttpError::InvalidMethod(parts[0].to_string()))?;
612
613        let full_path = parts[1];
614        let (path, query_str) = full_path
615            .split_once('?')
616            .map_or((full_path.to_string(), None), |(p, q)| {
617                (p.to_string(), Some(q))
618            });
619
620        let mut query = HashMap::new();
621        if let Some(qs) = query_str {
622            for pair in qs.split('&') {
623                if pair.is_empty() {
624                    continue;
625                }
626                let (k, v) = pair.split_once('=').unwrap_or((pair, ""));
627                query.insert(k.to_string(), v.to_string());
628            }
629        }
630
631        // Parse headers
632        let mut headers = HashMap::new();
633        for line in lines {
634            if line.is_empty() {
635                break;
636            }
637            if let Some((name, value)) = line.split_once(':') {
638                headers.insert(name.trim().to_lowercase(), value.trim().to_string());
639            }
640        }
641
642        // Read body.
643        //
644        // We support Content-Length or Transfer-Encoding: chunked. This is sufficient for MCP's
645        // JSON-RPC-over-HTTP payloads and avoids pulling in a full HTTP server stack here.
646        let mut body = Vec::new();
647
648        if let Some(te) = headers.get("transfer-encoding") {
649            if te.to_ascii_lowercase().contains("chunked") {
650                // Chunked transfer encoding
651                loop {
652                    // Read chunk size line (hex), terminated by CRLF.
653                    let mut line = Vec::new();
654                    loop {
655                        if self
656                            .reader
657                            .read(&mut byte)
658                            .map_err(|e| HttpError::Transport(e.into()))?
659                            == 0
660                        {
661                            return Err(HttpError::Closed);
662                        }
663                        line.push(byte[0]);
664                        if line.ends_with(b"\r\n") {
665                            break;
666                        }
667                        if line.len() > 1024 {
668                            return Err(HttpError::InvalidMethod(
669                                "invalid chunk size line".to_string(),
670                            ));
671                        }
672                    }
673
674                    let line_str = String::from_utf8_lossy(&line);
675                    let size_str = line_str.trim().split(';').next().unwrap_or("");
676                    let size = usize::from_str_radix(size_str, 16)
677                        .map_err(|_| HttpError::InvalidMethod("invalid chunk size".to_string()))?;
678
679                    if size == 0 {
680                        // Read and discard trailer headers until CRLF.
681                        let mut trailer = Vec::new();
682                        loop {
683                            trailer.clear();
684                            loop {
685                                if self
686                                    .reader
687                                    .read(&mut byte)
688                                    .map_err(|e| HttpError::Transport(e.into()))?
689                                    == 0
690                                {
691                                    return Err(HttpError::Closed);
692                                }
693                                trailer.push(byte[0]);
694                                if trailer.ends_with(b"\r\n") {
695                                    break;
696                                }
697                                if trailer.len() > MAX_HEADERS_SIZE {
698                                    return Err(HttpError::HeadersTooLarge {
699                                        size: trailer.len(),
700                                        max: MAX_HEADERS_SIZE,
701                                    });
702                                }
703                            }
704                            if trailer == b"\r\n" {
705                                break;
706                            }
707                        }
708                        break;
709                    }
710
711                    let mut chunk = vec![0u8; size];
712                    self.reader
713                        .read_exact(&mut chunk)
714                        .map_err(|e| HttpError::Transport(e.into()))?;
715                    body.extend_from_slice(&chunk);
716                    if body.len() > MAX_BODY_SIZE {
717                        return Err(HttpError::BodyTooLarge {
718                            size: body.len(),
719                            max: MAX_BODY_SIZE,
720                        });
721                    }
722
723                    // Consume trailing CRLF after the chunk.
724                    let mut crlf = [0u8; 2];
725                    self.reader
726                        .read_exact(&mut crlf)
727                        .map_err(|e| HttpError::Transport(e.into()))?;
728                    if &crlf != b"\r\n" {
729                        return Err(HttpError::InvalidMethod(
730                            "invalid chunk terminator".to_string(),
731                        ));
732                    }
733                }
734            } else {
735                return Err(HttpError::UnsupportedTransferEncoding(te.clone()));
736            }
737        } else {
738            // Content-Length (if present)
739            let content_length: usize = headers
740                .get("content-length")
741                .and_then(|s| s.parse().ok())
742                .unwrap_or(0);
743
744            if content_length > MAX_BODY_SIZE {
745                return Err(HttpError::BodyTooLarge {
746                    size: content_length,
747                    max: MAX_BODY_SIZE,
748                });
749            }
750
751            body.resize(content_length, 0);
752            if content_length > 0 {
753                self.reader
754                    .read_exact(&mut body)
755                    .map_err(|e| HttpError::Transport(e.into()))?;
756            }
757        }
758
759        Ok(HttpRequest {
760            method,
761            path,
762            headers,
763            body,
764            query,
765        })
766    }
767
768    /// Writes an HTTP response to the writer.
769    pub fn write_response(&mut self, response: &HttpResponse) -> Result<(), HttpError> {
770        let status_text = match response.status.0 {
771            200 => "OK",
772            400 => "Bad Request",
773            401 => "Unauthorized",
774            403 => "Forbidden",
775            404 => "Not Found",
776            500 => "Internal Server Error",
777            _ => "Unknown",
778        };
779
780        // Write status line
781        write!(
782            self.writer,
783            "HTTP/1.1 {} {}\r\n",
784            response.status.0, status_text
785        )
786        .map_err(|e| HttpError::Transport(e.into()))?;
787
788        // Write headers
789        for (name, value) in &response.headers {
790            write!(self.writer, "{}: {}\r\n", name, value)
791                .map_err(|e| HttpError::Transport(e.into()))?;
792        }
793
794        // Write content-length if not present
795        if !response.headers.contains_key("content-length") {
796            write!(self.writer, "content-length: {}\r\n", response.body.len())
797                .map_err(|e| HttpError::Transport(e.into()))?;
798        }
799
800        // End headers
801        write!(self.writer, "\r\n").map_err(|e| HttpError::Transport(e.into()))?;
802
803        // Write body
804        self.writer
805            .write_all(&response.body)
806            .map_err(|e| HttpError::Transport(e.into()))?;
807        self.writer
808            .flush()
809            .map_err(|e| HttpError::Transport(e.into()))?;
810
811        Ok(())
812    }
813
814    /// Queues a response to be sent.
815    pub fn queue_response(&mut self, response: JsonRpcResponse) {
816        self.pending_responses.push(response);
817    }
818}
819
820impl<R: Read, W: Write> Transport for HttpTransport<R, W> {
821    fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
822        if cx.is_cancel_requested() {
823            return Err(TransportError::Cancelled);
824        }
825
826        if self.closed {
827            return Err(TransportError::Closed);
828        }
829
830        let response = match message {
831            JsonRpcMessage::Response(r) => r.clone(),
832            JsonRpcMessage::Request(r) => {
833                // For HTTP transport, requests from server to client
834                // are typically sent as notifications or SSE events.
835                // This transport is request/response only and cannot deliver server-to-client
836                // requests. Returning Ok() would silently drop messages and can deadlock
837                // bidirectional protocols, so we fail explicitly.
838                let _ = r;
839                return Err(TransportError::Io(std::io::Error::other(
840                    "HttpTransport cannot send server-to-client requests",
841                )));
842            }
843        };
844
845        let http_response = HttpResponse::ok().with_json(&response);
846
847        self.write_response(&http_response)
848            .map_err(|_| TransportError::Io(std::io::Error::other("write error")))?;
849
850        Ok(())
851    }
852
853    fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
854        if cx.is_cancel_requested() {
855            return Err(TransportError::Cancelled);
856        }
857
858        if self.closed {
859            return Err(TransportError::Closed);
860        }
861
862        let http_request = self.read_request().map_err(|e| match e {
863            HttpError::Closed => TransportError::Closed,
864            HttpError::Timeout => TransportError::Timeout,
865            _ => TransportError::Io(std::io::Error::other(e.to_string())),
866        })?;
867
868        // Parse JSON-RPC from body
869        let json_rpc: JsonRpcRequest = serde_json::from_slice(&http_request.body)
870            .map_err(|e| TransportError::Codec(CodecError::Json(e)))?;
871
872        Ok(JsonRpcMessage::Request(json_rpc))
873    }
874
875    fn close(&mut self) -> Result<(), TransportError> {
876        self.closed = true;
877        Ok(())
878    }
879}
880
881// =============================================================================
882// Streaming HTTP Transport
883// =============================================================================
884
885/// Streaming HTTP transport for long-lived MCP connections.
886///
887/// This transport uses HTTP streaming (chunked transfer encoding) for
888/// server-to-client messages and regular POST requests for client-to-server
889/// messages.
890pub struct StreamableHttpTransport {
891    /// Request queue (from HTTP POST requests).
892    requests: Arc<Mutex<VecDeque<JsonRpcRequest>>>,
893    /// Response queue (to be sent via streaming).
894    responses: Arc<Mutex<VecDeque<JsonRpcResponse>>>,
895    /// Codec for message encoding.
896    codec: Codec,
897    /// Whether the transport is closed.
898    closed: bool,
899    /// Poll interval for checking new messages.
900    poll_interval: Duration,
901}
902
903impl StreamableHttpTransport {
904    /// Creates a new streaming HTTP transport.
905    #[must_use]
906    pub fn new() -> Self {
907        Self {
908            requests: Arc::new(Mutex::new(VecDeque::new())),
909            responses: Arc::new(Mutex::new(VecDeque::new())),
910            codec: Codec::new(),
911            closed: false,
912            poll_interval: Duration::from_millis(10),
913        }
914    }
915
916    /// Pushes a request into the queue (from HTTP handler).
917    pub fn push_request(&self, request: JsonRpcRequest) {
918        let mut guard = match self.requests.lock() {
919            Ok(guard) => guard,
920            Err(poisoned) => poisoned.into_inner(),
921        };
922        guard.push_back(request);
923    }
924
925    /// Pops a response from the queue (for HTTP streaming).
926    #[must_use]
927    pub fn pop_response(&self) -> Option<JsonRpcResponse> {
928        let mut guard = match self.responses.lock() {
929            Ok(guard) => guard,
930            Err(poisoned) => poisoned.into_inner(),
931        };
932        guard.pop_front()
933    }
934
935    /// Checks if there are pending responses.
936    #[must_use]
937    pub fn has_responses(&self) -> bool {
938        match self.responses.lock() {
939            Ok(guard) => !guard.is_empty(),
940            Err(poisoned) => !poisoned.into_inner().is_empty(),
941        }
942    }
943
944    /// Returns the request queue for external access.
945    #[must_use]
946    pub fn request_queue(&self) -> Arc<Mutex<VecDeque<JsonRpcRequest>>> {
947        Arc::clone(&self.requests)
948    }
949
950    /// Returns the response queue for external access.
951    #[must_use]
952    pub fn response_queue(&self) -> Arc<Mutex<VecDeque<JsonRpcResponse>>> {
953        Arc::clone(&self.responses)
954    }
955}
956
957impl Default for StreamableHttpTransport {
958    fn default() -> Self {
959        Self::new()
960    }
961}
962
963impl Transport for StreamableHttpTransport {
964    fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
965        if cx.is_cancel_requested() {
966            return Err(TransportError::Cancelled);
967        }
968
969        if self.closed {
970            return Err(TransportError::Closed);
971        }
972
973        match message {
974            JsonRpcMessage::Response(response) => {
975                let mut guard = self.responses.lock().map_err(|_| {
976                    TransportError::Io(std::io::Error::other(
977                        "streamable response queue lock poisoned",
978                    ))
979                })?;
980                guard.push_back(response.clone());
981            }
982            JsonRpcMessage::Request(_) => {
983                // This transport currently streams only JSON-RPC responses.
984                // Server-to-client requests/notifications cannot be represented
985                // in the current response queue shape, so fail explicitly to avoid
986                // silent message loss and potential protocol deadlocks.
987                return Err(TransportError::Io(std::io::Error::other(
988                    "StreamableHttpTransport cannot send server-to-client requests",
989                )));
990            }
991        }
992
993        Ok(())
994    }
995
996    fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
997        if cx.is_cancel_requested() {
998            return Err(TransportError::Cancelled);
999        }
1000
1001        if self.closed {
1002            return Err(TransportError::Closed);
1003        }
1004
1005        // Poll for requests
1006        loop {
1007            if cx.is_cancel_requested() {
1008                return Err(TransportError::Cancelled);
1009            }
1010
1011            let mut guard = self.requests.lock().map_err(|_| {
1012                TransportError::Io(std::io::Error::other(
1013                    "streamable request queue lock poisoned",
1014                ))
1015            })?;
1016            if let Some(request) = guard.pop_front() {
1017                return Ok(JsonRpcMessage::Request(request));
1018            }
1019            drop(guard);
1020
1021            // Sleep briefly before polling again
1022            std::thread::sleep(self.poll_interval);
1023        }
1024    }
1025
1026    fn close(&mut self) -> Result<(), TransportError> {
1027        self.closed = true;
1028        Ok(())
1029    }
1030}
1031
1032// =============================================================================
1033// Session Support
1034// =============================================================================
1035
1036/// HTTP session for maintaining state across requests.
1037#[derive(Debug, Clone)]
1038pub struct HttpSession {
1039    /// Session ID.
1040    pub id: String,
1041    /// Session creation time.
1042    pub created_at: Instant,
1043    /// Last activity time.
1044    pub last_activity: Instant,
1045    /// Session data.
1046    pub data: HashMap<String, serde_json::Value>,
1047}
1048
1049impl HttpSession {
1050    /// Creates a new session with the given ID.
1051    #[must_use]
1052    pub fn new(id: impl Into<String>) -> Self {
1053        let now = Instant::now();
1054        Self {
1055            id: id.into(),
1056            created_at: now,
1057            last_activity: now,
1058            data: HashMap::new(),
1059        }
1060    }
1061
1062    /// Updates the last activity time.
1063    pub fn touch(&mut self) {
1064        self.last_activity = Instant::now();
1065    }
1066
1067    /// Checks if the session has expired.
1068    #[must_use]
1069    pub fn is_expired(&self, timeout: Duration) -> bool {
1070        self.last_activity.elapsed() > timeout
1071    }
1072
1073    /// Gets a session value.
1074    #[must_use]
1075    pub fn get(&self, key: &str) -> Option<&serde_json::Value> {
1076        self.data.get(key)
1077    }
1078
1079    /// Sets a session value.
1080    pub fn set(&mut self, key: impl Into<String>, value: serde_json::Value) {
1081        self.data.insert(key.into(), value);
1082        self.touch();
1083    }
1084
1085    /// Removes a session value.
1086    pub fn remove(&mut self, key: &str) -> Option<serde_json::Value> {
1087        self.touch();
1088        self.data.remove(key)
1089    }
1090}
1091
1092/// Session store for HTTP sessions.
1093#[derive(Debug, Default)]
1094pub struct SessionStore {
1095    sessions: Mutex<HashMap<String, HttpSession>>,
1096    timeout: Duration,
1097}
1098
1099impl SessionStore {
1100    /// Creates a new session store with the given timeout.
1101    #[must_use]
1102    pub fn new(timeout: Duration) -> Self {
1103        Self {
1104            sessions: Mutex::new(HashMap::new()),
1105            timeout,
1106        }
1107    }
1108
1109    /// Creates a new session store with default 1-hour timeout.
1110    #[must_use]
1111    pub fn with_defaults() -> Self {
1112        Self::new(Duration::from_secs(3600))
1113    }
1114
1115    /// Creates a new session.
1116    #[must_use]
1117    pub fn create(&self) -> String {
1118        let id = generate_session_id();
1119        let session = HttpSession::new(&id);
1120
1121        if let Ok(mut guard) = self.sessions.lock() {
1122            guard.insert(id.clone(), session);
1123        }
1124
1125        id
1126    }
1127
1128    /// Gets a session by ID.
1129    #[must_use]
1130    pub fn get(&self, id: &str) -> Option<HttpSession> {
1131        let mut guard = self.sessions.lock().ok()?;
1132        let session = guard.get_mut(id)?;
1133
1134        if session.is_expired(self.timeout) {
1135            guard.remove(id);
1136            return None;
1137        }
1138
1139        session.touch();
1140        Some(session.clone())
1141    }
1142
1143    /// Updates a session.
1144    pub fn update(&self, session: HttpSession) {
1145        if let Ok(mut guard) = self.sessions.lock() {
1146            guard.insert(session.id.clone(), session);
1147        }
1148    }
1149
1150    /// Removes a session.
1151    pub fn remove(&self, id: &str) {
1152        if let Ok(mut guard) = self.sessions.lock() {
1153            guard.remove(id);
1154        }
1155    }
1156
1157    /// Removes expired sessions.
1158    pub fn cleanup(&self) {
1159        if let Ok(mut guard) = self.sessions.lock() {
1160            guard.retain(|_, s| !s.is_expired(self.timeout));
1161        }
1162    }
1163
1164    /// Returns the number of active sessions.
1165    #[must_use]
1166    pub fn count(&self) -> usize {
1167        self.sessions.lock().map(|g| g.len()).unwrap_or(0)
1168    }
1169}
1170
1171/// Generates a random session ID.
1172fn generate_session_id() -> String {
1173    use std::collections::hash_map::RandomState;
1174    use std::hash::{BuildHasher, Hasher};
1175    use std::time::{SystemTime, UNIX_EPOCH};
1176
1177    let state = RandomState::new();
1178    let mut hasher = state.build_hasher();
1179    hasher.write_u128(
1180        SystemTime::now()
1181            .duration_since(UNIX_EPOCH)
1182            .unwrap_or_default()
1183            .as_nanos(),
1184    );
1185
1186    format!("{:016x}", hasher.finish())
1187}
1188
1189// =============================================================================
1190// Tests
1191// =============================================================================
1192
1193#[cfg(test)]
1194mod tests {
1195    use super::*;
1196
1197    #[test]
1198    fn test_http_method_parse() {
1199        assert_eq!(HttpMethod::parse("GET"), Some(HttpMethod::Get));
1200        assert_eq!(HttpMethod::parse("POST"), Some(HttpMethod::Post));
1201        assert_eq!(HttpMethod::parse("get"), Some(HttpMethod::Get));
1202        assert_eq!(HttpMethod::parse("INVALID"), None);
1203    }
1204
1205    #[test]
1206    fn test_http_status() {
1207        assert!(HttpStatus::OK.is_success());
1208        assert!(HttpStatus::BAD_REQUEST.is_client_error());
1209        assert!(HttpStatus::INTERNAL_SERVER_ERROR.is_server_error());
1210    }
1211
1212    #[test]
1213    fn test_http_request_builder() {
1214        let request = HttpRequest::new(HttpMethod::Post, "/api/mcp")
1215            .with_header("Content-Type", "application/json")
1216            .with_body(b"{\"test\": true}".to_vec())
1217            .with_query("version", "1");
1218
1219        assert_eq!(request.method, HttpMethod::Post);
1220        assert_eq!(request.path, "/api/mcp");
1221        assert_eq!(request.header("content-type"), Some("application/json"));
1222        assert_eq!(request.query.get("version"), Some(&"1".to_string()));
1223    }
1224
1225    #[test]
1226    fn test_http_response_builder() {
1227        let response = HttpResponse::ok()
1228            .with_header("X-Custom", "value")
1229            .with_body(b"Hello".to_vec());
1230
1231        assert_eq!(response.status, HttpStatus::OK);
1232        assert_eq!(response.headers.get("x-custom"), Some(&"value".to_string()));
1233        assert_eq!(response.body, b"Hello");
1234    }
1235
1236    #[test]
1237    fn test_http_response_json() {
1238        let data = serde_json::json!({"result": "ok"});
1239        let response = HttpResponse::ok().with_json(&data);
1240
1241        assert!(!response.body.is_empty());
1242        assert_eq!(
1243            response.headers.get("content-type"),
1244            Some(&"application/json".to_string())
1245        );
1246    }
1247
1248    #[test]
1249    fn test_http_response_cors() {
1250        let response = HttpResponse::ok().with_cors("https://example.com");
1251
1252        assert_eq!(
1253            response.headers.get("access-control-allow-origin"),
1254            Some(&"https://example.com".to_string())
1255        );
1256    }
1257
1258    #[test]
1259    fn test_http_handler_options() {
1260        let handler = HttpRequestHandler::new();
1261        let request = HttpRequest::new(HttpMethod::Options, "/mcp/v1")
1262            .with_header("Origin", "https://example.com");
1263
1264        let response = handler.handle_options(&request);
1265        assert_eq!(response.status, HttpStatus::OK);
1266    }
1267
1268    #[test]
1269    fn test_http_handler_parse_request() {
1270        let handler = HttpRequestHandler::new();
1271
1272        // Valid request
1273        let json_rpc = serde_json::json!({
1274            "jsonrpc": "2.0",
1275            "method": "test",
1276            "id": 1
1277        });
1278        let request = HttpRequest::new(HttpMethod::Post, "/mcp/v1")
1279            .with_header("Content-Type", "application/json")
1280            .with_body(serde_json::to_vec(&json_rpc).unwrap());
1281
1282        let result = handler.parse_request(&request);
1283        assert!(result.is_ok());
1284
1285        // Invalid method
1286        let request = HttpRequest::new(HttpMethod::Get, "/mcp/v1");
1287        assert!(handler.parse_request(&request).is_err());
1288
1289        // Invalid content type
1290        let request =
1291            HttpRequest::new(HttpMethod::Post, "/mcp/v1").with_header("Content-Type", "text/plain");
1292        assert!(handler.parse_request(&request).is_err());
1293    }
1294
1295    #[test]
1296    fn test_http_session() {
1297        let mut session = HttpSession::new("test-session");
1298        assert_eq!(session.id, "test-session");
1299
1300        session.set("key", serde_json::json!("value"));
1301        assert_eq!(session.get("key"), Some(&serde_json::json!("value")));
1302
1303        session.remove("key");
1304        assert!(session.get("key").is_none());
1305
1306        assert!(!session.is_expired(Duration::from_secs(3600)));
1307    }
1308
1309    #[test]
1310    fn test_session_store() {
1311        let store = SessionStore::with_defaults();
1312
1313        let id = store.create();
1314        assert!(!id.is_empty());
1315
1316        let session = store.get(&id);
1317        assert!(session.is_some());
1318
1319        store.remove(&id);
1320        assert!(store.get(&id).is_none());
1321    }
1322
1323    #[test]
1324    fn test_streamable_transport() {
1325        let transport = StreamableHttpTransport::new();
1326
1327        // Push a request
1328        let request = JsonRpcRequest::new("test", None, 1i64);
1329        transport.push_request(request);
1330
1331        // Should have a request in queue
1332        let guard = transport.requests.lock().unwrap();
1333        assert_eq!(guard.len(), 1);
1334    }
1335
1336    #[test]
1337    fn test_http_error_display() {
1338        let err = HttpError::InvalidMethod("PATCH".to_string());
1339        assert!(err.to_string().contains("PATCH"));
1340
1341        let err = HttpError::Timeout;
1342        assert!(err.to_string().contains("timeout"));
1343    }
1344
1345    #[test]
1346    fn test_generate_session_id() {
1347        let id1 = generate_session_id();
1348        let id2 = generate_session_id();
1349
1350        assert_ne!(id1, id2);
1351        assert_eq!(id1.len(), 16);
1352    }
1353
1354    #[test]
1355    fn test_http_transport_read_request_chunked_body_and_query() {
1356        use std::io::Cursor;
1357
1358        let body = br#"{"jsonrpc":"2.0","method":"test","id":1}"#;
1359        let body1 = &body[..10];
1360        let body2 = &body[10..];
1361
1362        let raw = format!(
1363            "POST /mcp/v1?foo=bar&x=y HTTP/1.1\r\n\
1364Host: example.com\r\n\
1365Content-Type: application/json\r\n\
1366Transfer-Encoding: chunked\r\n\
1367\r\n\
1368{:x}\r\n\
1369{}\r\n\
1370{:x}\r\n\
1371{}\r\n\
13720\r\n\
1373\r\n",
1374            body1.len(),
1375            std::str::from_utf8(body1).unwrap(),
1376            body2.len(),
1377            std::str::from_utf8(body2).unwrap(),
1378        );
1379
1380        let reader = Cursor::new(raw.into_bytes());
1381        let mut output = Vec::new();
1382        let mut transport = HttpTransport::new(reader, &mut output);
1383
1384        let req = transport.read_request().unwrap();
1385        assert_eq!(req.method, HttpMethod::Post);
1386        assert_eq!(req.path, "/mcp/v1");
1387        assert_eq!(req.query.get("foo"), Some(&"bar".to_string()));
1388        assert_eq!(req.query.get("x"), Some(&"y".to_string()));
1389        assert_eq!(req.body, body);
1390    }
1391
1392    // =========================================================================
1393    // E2E HTTP Transport Tests (bd-2kv / bd-3fq1)
1394    // =========================================================================
1395
1396    #[test]
1397    fn e2e_http_request_response_flow() {
1398        use fastmcp_protocol::RequestId;
1399        use std::io::Cursor;
1400
1401        // Build an HTTP request with JSON-RPC body
1402        let json_rpc_request = serde_json::json!({
1403            "jsonrpc": "2.0",
1404            "method": "tools/list",
1405            "id": 1
1406        });
1407        let body = serde_json::to_vec(&json_rpc_request).unwrap();
1408
1409        let http_request = format!(
1410            "POST /mcp/v1 HTTP/1.1\r\n\
1411             Content-Type: application/json\r\n\
1412             Content-Length: {}\r\n\
1413             \r\n",
1414            body.len()
1415        );
1416
1417        let mut input = http_request.into_bytes();
1418        input.extend(body);
1419
1420        let reader = Cursor::new(input);
1421        let mut output = Vec::new();
1422
1423        let cx = Cx::for_testing();
1424
1425        {
1426            let mut transport = HttpTransport::new(reader, &mut output);
1427
1428            // Receive the request
1429            let msg = transport.recv(&cx).unwrap();
1430            assert!(
1431                matches!(msg, JsonRpcMessage::Request(_)),
1432                "Expected request"
1433            );
1434            let JsonRpcMessage::Request(req) = msg else {
1435                return;
1436            };
1437
1438            assert_eq!(req.method, "tools/list");
1439            assert_eq!(req.id, Some(RequestId::Number(1)));
1440
1441            // Send response
1442            let response = JsonRpcResponse {
1443                jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1444                result: Some(serde_json::json!({"tools": []})),
1445                error: None,
1446                id: Some(RequestId::Number(1)),
1447            };
1448            transport
1449                .send(&cx, &JsonRpcMessage::Response(response))
1450                .unwrap();
1451        }
1452
1453        // Verify HTTP response
1454        let response_str = String::from_utf8(output).unwrap();
1455        assert!(response_str.starts_with("HTTP/1.1 200 OK\r\n"));
1456        assert!(response_str.contains("content-type: application/json"));
1457        assert!(response_str.contains("\"tools\":[]"));
1458    }
1459
1460    #[test]
1461    fn e2e_http_error_status_codes() {
1462        let handler = HttpRequestHandler::new();
1463
1464        // Invalid method should return error
1465        let request = HttpRequest::new(HttpMethod::Get, "/mcp/v1")
1466            .with_header("Content-Type", "application/json");
1467        let result = handler.parse_request(&request);
1468        assert!(matches!(result, Err(HttpError::InvalidMethod(_))));
1469
1470        // Invalid content type
1471        let request =
1472            HttpRequest::new(HttpMethod::Post, "/mcp/v1").with_header("Content-Type", "text/xml");
1473        let result = handler.parse_request(&request);
1474        assert!(matches!(result, Err(HttpError::InvalidContentType(_))));
1475
1476        // Create error response
1477        let response = handler.error_response(HttpStatus::BAD_REQUEST, "Invalid request format");
1478        assert_eq!(response.status, HttpStatus::BAD_REQUEST);
1479        let body_str = String::from_utf8(response.body).unwrap();
1480        assert!(body_str.contains("\"error\""));
1481    }
1482
1483    #[test]
1484    fn e2e_http_content_type_handling() {
1485        let handler = HttpRequestHandler::new();
1486
1487        // Standard JSON content type
1488        let request = HttpRequest::new(HttpMethod::Post, "/mcp/v1")
1489            .with_header("Content-Type", "application/json")
1490            .with_body(r#"{"jsonrpc":"2.0","method":"test","id":1}"#);
1491        assert!(handler.parse_request(&request).is_ok());
1492
1493        // JSON with charset
1494        let request = HttpRequest::new(HttpMethod::Post, "/mcp/v1")
1495            .with_header("Content-Type", "application/json; charset=utf-8")
1496            .with_body(r#"{"jsonrpc":"2.0","method":"test","id":1}"#);
1497        assert!(handler.parse_request(&request).is_ok());
1498
1499        // Response content type is always application/json
1500        let response = JsonRpcResponse {
1501            jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1502            result: Some(serde_json::json!({})),
1503            error: None,
1504            id: Some(fastmcp_protocol::RequestId::Number(1)),
1505        };
1506        let http_response = handler.create_response(&response, None);
1507        assert_eq!(
1508            http_response.headers.get("content-type"),
1509            Some(&"application/json".to_string())
1510        );
1511    }
1512
1513    #[test]
1514    fn e2e_http_cors_handling() {
1515        let config = HttpHandlerConfig {
1516            allow_cors: true,
1517            cors_origins: vec!["https://allowed.com".to_string()],
1518            ..Default::default()
1519        };
1520        let handler = HttpRequestHandler::with_config(config);
1521
1522        // Allowed origin
1523        assert!(handler.is_origin_allowed("https://allowed.com"));
1524
1525        // Disallowed origin
1526        assert!(!handler.is_origin_allowed("https://evil.com"));
1527
1528        // OPTIONS request from allowed origin
1529        let request = HttpRequest::new(HttpMethod::Options, "/mcp/v1")
1530            .with_header("Origin", "https://allowed.com");
1531        let response = handler.handle_options(&request);
1532        assert_eq!(response.status, HttpStatus::OK);
1533        assert_eq!(
1534            response.headers.get("access-control-allow-origin"),
1535            Some(&"https://allowed.com".to_string())
1536        );
1537
1538        // OPTIONS request from disallowed origin
1539        let request = HttpRequest::new(HttpMethod::Options, "/mcp/v1")
1540            .with_header("Origin", "https://evil.com");
1541        let response = handler.handle_options(&request);
1542        assert_eq!(response.status, HttpStatus::FORBIDDEN);
1543    }
1544
1545    #[test]
1546    fn e2e_http_streaming_transport() {
1547        use fastmcp_protocol::RequestId;
1548
1549        let mut transport = StreamableHttpTransport::new();
1550        let cx = Cx::for_testing();
1551
1552        // Simulate multiple requests being pushed (from HTTP handlers)
1553        let req1 = JsonRpcRequest::new("method1", None, 1i64);
1554        let req2 = JsonRpcRequest::new("method2", None, 2i64);
1555        transport.push_request(req1);
1556        transport.push_request(req2);
1557
1558        // Transport should receive requests in FIFO order.
1559        let msg = transport.recv(&cx).unwrap();
1560        if let JsonRpcMessage::Request(req) = msg {
1561            assert_eq!(req.method, "method1");
1562        }
1563
1564        // Send a response
1565        let response = JsonRpcResponse {
1566            jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1567            result: Some(serde_json::json!({})),
1568            error: None,
1569            id: Some(RequestId::Number(2)),
1570        };
1571        transport
1572            .send(&cx, &JsonRpcMessage::Response(response))
1573            .unwrap();
1574
1575        // Response should be available for streaming
1576        assert!(transport.has_responses());
1577        let resp = transport.pop_response().unwrap();
1578        assert_eq!(resp.id, Some(RequestId::Number(2)));
1579    }
1580
1581    #[test]
1582    fn e2e_http_streaming_response_queue_is_fifo() {
1583        use fastmcp_protocol::RequestId;
1584
1585        let mut transport = StreamableHttpTransport::new();
1586        let cx = Cx::for_testing();
1587
1588        let first = JsonRpcResponse {
1589            jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1590            result: Some(serde_json::json!({"seq": 1})),
1591            error: None,
1592            id: Some(RequestId::Number(1)),
1593        };
1594        let second = JsonRpcResponse {
1595            jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1596            result: Some(serde_json::json!({"seq": 2})),
1597            error: None,
1598            id: Some(RequestId::Number(2)),
1599        };
1600
1601        transport
1602            .send(&cx, &JsonRpcMessage::Response(first))
1603            .unwrap();
1604        transport
1605            .send(&cx, &JsonRpcMessage::Response(second))
1606            .unwrap();
1607
1608        let first_out = transport.pop_response().expect("first response");
1609        let second_out = transport.pop_response().expect("second response");
1610        assert_eq!(first_out.id, Some(RequestId::Number(1)));
1611        assert_eq!(second_out.id, Some(RequestId::Number(2)));
1612    }
1613
1614    #[test]
1615    fn e2e_http_streaming_rejects_server_to_client_requests() {
1616        let mut transport = StreamableHttpTransport::new();
1617        let cx = Cx::for_testing();
1618        let request = JsonRpcRequest::notification("notifications/message", None);
1619
1620        let err = transport
1621            .send(&cx, &JsonRpcMessage::Request(request))
1622            .expect_err("streamable transport must reject server-to-client requests");
1623
1624        assert!(matches!(err, TransportError::Io(_)));
1625    }
1626
1627    #[test]
1628    fn e2e_http_streaming_send_fails_when_response_queue_poisoned() {
1629        use fastmcp_protocol::RequestId;
1630        use std::thread;
1631
1632        let mut transport = StreamableHttpTransport::new();
1633        let queue = transport.response_queue();
1634        let poison = thread::spawn(move || {
1635            let _guard = queue.lock().expect("lock response queue");
1636            std::panic::panic_any("poison response queue");
1637        });
1638        assert!(poison.join().is_err());
1639
1640        let cx = Cx::for_testing();
1641        let response = JsonRpcResponse {
1642            jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1643            result: Some(serde_json::json!({"ok": true})),
1644            error: None,
1645            id: Some(RequestId::Number(1)),
1646        };
1647
1648        let err = transport
1649            .send(&cx, &JsonRpcMessage::Response(response))
1650            .expect_err("poisoned response queue must fail send");
1651        assert!(matches!(err, TransportError::Io(_)));
1652    }
1653
1654    #[test]
1655    fn e2e_http_streaming_recv_fails_when_request_queue_poisoned() {
1656        use std::thread;
1657
1658        let mut transport = StreamableHttpTransport::new();
1659        let queue = transport.request_queue();
1660        let poison = thread::spawn(move || {
1661            let _guard = queue.lock().expect("lock request queue");
1662            std::panic::panic_any("poison request queue");
1663        });
1664        assert!(poison.join().is_err());
1665
1666        let cx = Cx::for_testing();
1667        let err = transport
1668            .recv(&cx)
1669            .expect_err("poisoned request queue must fail recv");
1670        assert!(matches!(err, TransportError::Io(_)));
1671    }
1672
1673    #[test]
1674    fn e2e_http_streaming_push_request_recovers_from_poisoned_queue() {
1675        use std::thread;
1676
1677        let transport = StreamableHttpTransport::new();
1678        let queue = transport.request_queue();
1679        let poison_queue = Arc::clone(&queue);
1680        let poison = thread::spawn(move || {
1681            let _guard = poison_queue.lock().expect("lock request queue");
1682            std::panic::panic_any("poison request queue");
1683        });
1684        assert!(poison.join().is_err());
1685
1686        transport.push_request(JsonRpcRequest::new("recovered-method", None, 7i64));
1687
1688        let guard = match queue.lock() {
1689            Ok(guard) => guard,
1690            Err(poisoned) => poisoned.into_inner(),
1691        };
1692        assert_eq!(guard.len(), 1);
1693        assert_eq!(guard[0].method, "recovered-method");
1694    }
1695
1696    #[test]
1697    fn e2e_http_streaming_response_helpers_recover_from_poisoned_queue() {
1698        use fastmcp_protocol::RequestId;
1699        use std::thread;
1700
1701        let transport = StreamableHttpTransport::new();
1702        let queue = transport.response_queue();
1703        {
1704            let mut guard = queue.lock().expect("lock response queue");
1705            guard.push_back(JsonRpcResponse {
1706                jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1707                result: Some(serde_json::json!({"seq": 9})),
1708                error: None,
1709                id: Some(RequestId::Number(9)),
1710            });
1711        }
1712
1713        let poison_queue = Arc::clone(&queue);
1714        let poison = thread::spawn(move || {
1715            let _guard = poison_queue.lock().expect("lock response queue");
1716            std::panic::panic_any("poison response queue");
1717        });
1718        assert!(poison.join().is_err());
1719
1720        assert!(transport.has_responses());
1721        let response = transport
1722            .pop_response()
1723            .expect("poisoned queue should still yield response");
1724        assert_eq!(response.id, Some(RequestId::Number(9)));
1725    }
1726
1727    #[test]
1728    fn e2e_http_session_lifecycle() {
1729        let store = SessionStore::new(Duration::from_millis(100));
1730
1731        // Create session
1732        let id = store.create();
1733        assert_eq!(store.count(), 1);
1734
1735        // Get and modify session
1736        let mut session = store.get(&id).unwrap();
1737        session.set("user_id", serde_json::json!(42));
1738        store.update(session);
1739
1740        // Retrieve and verify
1741        let session = store.get(&id).unwrap();
1742        assert_eq!(session.get("user_id"), Some(&serde_json::json!(42)));
1743
1744        // Wait for expiration
1745        std::thread::sleep(Duration::from_millis(150));
1746
1747        // Session should be expired
1748        assert!(store.get(&id).is_none());
1749    }
1750
1751    #[test]
1752    fn e2e_http_transport_cancellation() {
1753        use std::io::Cursor;
1754
1755        let reader = Cursor::new(Vec::<u8>::new());
1756        let mut output = Vec::new();
1757
1758        let cx = Cx::for_testing();
1759        cx.set_cancel_requested(true);
1760
1761        let mut transport = HttpTransport::new(reader, &mut output);
1762
1763        // Send should respect cancellation
1764        let response = JsonRpcResponse {
1765            jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1766            result: None,
1767            error: None,
1768            id: None,
1769        };
1770        let result = transport.send(&cx, &JsonRpcMessage::Response(response));
1771        assert!(matches!(result, Err(TransportError::Cancelled)));
1772
1773        // Nothing should be written
1774        assert!(output.is_empty());
1775    }
1776
1777    #[test]
1778    fn e2e_http_transport_close() {
1779        use std::io::Cursor;
1780
1781        let reader = Cursor::new(Vec::<u8>::new());
1782        let mut output = Vec::new();
1783
1784        let cx = Cx::for_testing();
1785        let mut transport = HttpTransport::new(reader, &mut output);
1786
1787        // Close transport
1788        transport.close().unwrap();
1789
1790        // Operations should fail after close
1791        let response = JsonRpcResponse {
1792            jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1793            result: None,
1794            error: None,
1795            id: None,
1796        };
1797        let result = transport.send(&cx, &JsonRpcMessage::Response(response));
1798        assert!(matches!(result, Err(TransportError::Closed)));
1799    }
1800
1801    #[test]
1802    fn e2e_http_body_size_limit() {
1803        let config = HttpHandlerConfig {
1804            max_body_size: 100,
1805            ..Default::default()
1806        };
1807        let handler = HttpRequestHandler::with_config(config);
1808
1809        // Body exceeding limit
1810        let large_body = vec![b'x'; 200];
1811        let request = HttpRequest::new(HttpMethod::Post, "/mcp/v1")
1812            .with_header("Content-Type", "application/json")
1813            .with_body(large_body);
1814
1815        let result = handler.parse_request(&request);
1816        assert!(matches!(result, Err(HttpError::BodyTooLarge { .. })));
1817    }
1818
1819    #[test]
1820    fn http_method_as_str_round_trips() {
1821        let methods = [
1822            HttpMethod::Get,
1823            HttpMethod::Post,
1824            HttpMethod::Put,
1825            HttpMethod::Delete,
1826            HttpMethod::Options,
1827            HttpMethod::Head,
1828            HttpMethod::Patch,
1829        ];
1830        for m in methods {
1831            let s = m.as_str();
1832            let parsed = HttpMethod::parse(s).unwrap();
1833            assert_eq!(parsed, m);
1834        }
1835    }
1836
1837    #[test]
1838    fn http_status_boundary_cases() {
1839        assert!(HttpStatus(299).is_success());
1840        assert!(!HttpStatus(300).is_success());
1841        assert!(HttpStatus(499).is_client_error());
1842        assert!(!HttpStatus(500).is_client_error());
1843        assert!(HttpStatus(599).is_server_error());
1844        assert!(!HttpStatus(600).is_server_error());
1845    }
1846
1847    #[test]
1848    fn http_request_content_type_and_authorization() {
1849        let req = HttpRequest::new(HttpMethod::Get, "/")
1850            .with_header("Content-Type", "text/plain")
1851            .with_header("Authorization", "Bearer token123");
1852        assert_eq!(req.content_type(), Some("text/plain"));
1853        assert_eq!(req.authorization(), Some("Bearer token123"));
1854    }
1855
1856    #[test]
1857    fn http_request_json_parse() {
1858        let body = serde_json::json!({"key": "value"});
1859        let req =
1860            HttpRequest::new(HttpMethod::Post, "/").with_body(serde_json::to_vec(&body).unwrap());
1861        let parsed: serde_json::Value = req.json().unwrap();
1862        assert_eq!(parsed["key"], "value");
1863    }
1864
1865    #[test]
1866    fn http_response_convenience_constructors() {
1867        let bad = HttpResponse::bad_request();
1868        assert_eq!(bad.status, HttpStatus::BAD_REQUEST);
1869        let err = HttpResponse::internal_error();
1870        assert_eq!(err.status, HttpStatus::INTERNAL_SERVER_ERROR);
1871    }
1872
1873    #[test]
1874    fn http_handler_config_defaults() {
1875        let config = HttpHandlerConfig::default();
1876        assert_eq!(config.base_path, "/mcp/v1");
1877        assert!(config.allow_cors);
1878        assert_eq!(config.cors_origins, vec!["*".to_string()]);
1879        assert_eq!(config.timeout, Duration::from_secs(30));
1880        assert_eq!(config.max_body_size, 10 * 1024 * 1024);
1881    }
1882
1883    #[test]
1884    fn http_handler_config_accessor() {
1885        let handler = HttpRequestHandler::new();
1886        assert_eq!(handler.config().base_path, "/mcp/v1");
1887    }
1888
1889    #[test]
1890    fn http_error_display_all_variants() {
1891        let cases: Vec<(HttpError, &str)> = vec![
1892            (
1893                HttpError::InvalidMethod("X".into()),
1894                "invalid HTTP method: X",
1895            ),
1896            (
1897                HttpError::InvalidContentType("text/xml".into()),
1898                "invalid content type: text/xml",
1899            ),
1900            (
1901                HttpError::HeadersTooLarge { size: 100, max: 50 },
1902                "headers too large: 100 > 50",
1903            ),
1904            (
1905                HttpError::BodyTooLarge {
1906                    size: 200,
1907                    max: 100,
1908                },
1909                "body too large: 200 > 100",
1910            ),
1911            (
1912                HttpError::UnsupportedTransferEncoding("gzip".into()),
1913                "unsupported transfer encoding: gzip",
1914            ),
1915            (HttpError::Timeout, "request timeout"),
1916            (HttpError::Closed, "connection closed"),
1917        ];
1918        for (err, expected) in cases {
1919            assert!(
1920                err.to_string().contains(expected),
1921                "expected '{}' in '{}'",
1922                expected,
1923                err
1924            );
1925        }
1926    }
1927
1928    #[test]
1929    fn http_error_from_codec_error() {
1930        let codec_err = CodecError::MessageTooLarge(999);
1931        let http_err: HttpError = codec_err.into();
1932        assert!(matches!(http_err, HttpError::CodecError(_)));
1933        assert!(http_err.to_string().contains("codec error"));
1934    }
1935
1936    #[test]
1937    fn http_error_from_transport_error() {
1938        let transport_err = TransportError::Closed;
1939        let http_err: HttpError = transport_err.into();
1940        assert!(matches!(http_err, HttpError::Transport(_)));
1941        assert!(http_err.to_string().contains("transport error"));
1942    }
1943
1944    #[test]
1945    fn http_transport_send_rejects_request_messages() {
1946        use std::io::Cursor;
1947
1948        let reader = Cursor::new(Vec::<u8>::new());
1949        let mut output = Vec::new();
1950        let cx = Cx::for_testing();
1951        let mut transport = HttpTransport::new(reader, &mut output);
1952
1953        let request = JsonRpcRequest::new("test", None, 1i64);
1954        let result = transport.send(&cx, &JsonRpcMessage::Request(request));
1955        assert!(result.is_err());
1956    }
1957
1958    #[test]
1959    fn session_store_cleanup_removes_expired() {
1960        let store = SessionStore::new(Duration::from_millis(50));
1961        let _id1 = store.create();
1962        let _id2 = store.create();
1963        assert_eq!(store.count(), 2);
1964
1965        std::thread::sleep(Duration::from_millis(100));
1966        store.cleanup();
1967        assert_eq!(store.count(), 0);
1968    }
1969
1970    #[test]
1971    fn handle_options_cors_disabled() {
1972        let config = HttpHandlerConfig {
1973            allow_cors: false,
1974            ..Default::default()
1975        };
1976        let handler = HttpRequestHandler::with_config(config);
1977        let request = HttpRequest::new(HttpMethod::Options, "/mcp/v1");
1978        let response = handler.handle_options(&request);
1979        assert_eq!(response.status, HttpStatus::METHOD_NOT_ALLOWED);
1980    }
1981}