1use std::collections::{HashMap, VecDeque};
52use std::io::{Read, Write};
53use std::sync::{Arc, Mutex};
54use std::time::{Duration, Instant};
55
56use asupersync::Cx;
57use fastmcp_protocol::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse};
58
59use crate::{Codec, CodecError, Transport, TransportError};
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum HttpMethod {
68 Get,
69 Post,
70 Put,
71 Delete,
72 Options,
73 Head,
74 Patch,
75}
76
77impl HttpMethod {
78 #[must_use]
80 pub fn parse(s: &str) -> Option<Self> {
81 match s.to_uppercase().as_str() {
82 "GET" => Some(Self::Get),
83 "POST" => Some(Self::Post),
84 "PUT" => Some(Self::Put),
85 "DELETE" => Some(Self::Delete),
86 "OPTIONS" => Some(Self::Options),
87 "HEAD" => Some(Self::Head),
88 "PATCH" => Some(Self::Patch),
89 _ => None,
90 }
91 }
92
93 #[must_use]
95 pub fn as_str(&self) -> &'static str {
96 match self {
97 Self::Get => "GET",
98 Self::Post => "POST",
99 Self::Put => "PUT",
100 Self::Delete => "DELETE",
101 Self::Options => "OPTIONS",
102 Self::Head => "HEAD",
103 Self::Patch => "PATCH",
104 }
105 }
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub struct HttpStatus(pub u16);
111
112impl HttpStatus {
113 pub const OK: Self = Self(200);
114 pub const ACCEPTED: Self = Self(202);
115 pub const BAD_REQUEST: Self = Self(400);
116 pub const UNAUTHORIZED: Self = Self(401);
117 pub const FORBIDDEN: Self = Self(403);
118 pub const NOT_FOUND: Self = Self(404);
119 pub const METHOD_NOT_ALLOWED: Self = Self(405);
120 pub const INTERNAL_SERVER_ERROR: Self = Self(500);
121 pub const SERVICE_UNAVAILABLE: Self = Self(503);
122
123 #[must_use]
125 pub fn is_success(&self) -> bool {
126 (200..300).contains(&self.0)
127 }
128
129 #[must_use]
131 pub fn is_client_error(&self) -> bool {
132 (400..500).contains(&self.0)
133 }
134
135 #[must_use]
137 pub fn is_server_error(&self) -> bool {
138 (500..600).contains(&self.0)
139 }
140}
141
142#[derive(Debug, Clone)]
144pub struct HttpRequest {
145 pub method: HttpMethod,
147 pub path: String,
149 pub headers: HashMap<String, String>,
151 pub body: Vec<u8>,
153 pub query: HashMap<String, String>,
155}
156
157impl HttpRequest {
158 #[must_use]
160 pub fn new(method: HttpMethod, path: impl Into<String>) -> Self {
161 Self {
162 method,
163 path: path.into(),
164 headers: HashMap::new(),
165 body: Vec::new(),
166 query: HashMap::new(),
167 }
168 }
169
170 #[must_use]
172 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
173 self.headers
174 .insert(name.into().to_lowercase(), value.into());
175 self
176 }
177
178 #[must_use]
180 pub fn with_body(mut self, body: impl Into<Vec<u8>>) -> Self {
181 self.body = body.into();
182 self
183 }
184
185 #[must_use]
187 pub fn with_query(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
188 self.query.insert(name.into(), value.into());
189 self
190 }
191
192 #[must_use]
194 pub fn header(&self, name: &str) -> Option<&str> {
195 self.headers.get(&name.to_lowercase()).map(String::as_str)
196 }
197
198 #[must_use]
200 pub fn content_type(&self) -> Option<&str> {
201 self.header("content-type")
202 }
203
204 #[must_use]
206 pub fn authorization(&self) -> Option<&str> {
207 self.header("authorization")
208 }
209
210 pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
212 serde_json::from_slice(&self.body)
213 }
214}
215
216#[derive(Debug, Clone)]
218pub struct HttpResponse {
219 pub status: HttpStatus,
221 pub headers: HashMap<String, String>,
223 pub body: Vec<u8>,
225}
226
227impl HttpResponse {
228 #[must_use]
230 pub fn new(status: HttpStatus) -> Self {
231 let mut headers = HashMap::new();
232 headers.insert("content-type".to_string(), "application/json".to_string());
233 Self {
234 status,
235 headers,
236 body: Vec::new(),
237 }
238 }
239
240 #[must_use]
242 pub fn ok() -> Self {
243 Self::new(HttpStatus::OK)
244 }
245
246 #[must_use]
248 pub fn bad_request() -> Self {
249 Self::new(HttpStatus::BAD_REQUEST)
250 }
251
252 #[must_use]
254 pub fn internal_error() -> Self {
255 Self::new(HttpStatus::INTERNAL_SERVER_ERROR)
256 }
257
258 #[must_use]
260 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
261 self.headers
262 .insert(name.into().to_lowercase(), value.into());
263 self
264 }
265
266 #[must_use]
268 pub fn with_body(mut self, body: impl Into<Vec<u8>>) -> Self {
269 self.body = body.into();
270 self
271 }
272
273 #[must_use]
275 pub fn with_json<T: serde::Serialize>(mut self, value: &T) -> Self {
276 self.body = serde_json::to_vec(value).unwrap_or_default();
277 self.headers
278 .insert("content-type".to_string(), "application/json".to_string());
279 self
280 }
281
282 #[must_use]
284 pub fn with_cors(mut self, origin: &str) -> Self {
285 self.headers.insert(
286 "access-control-allow-origin".to_string(),
287 origin.to_string(),
288 );
289 self.headers.insert(
290 "access-control-allow-methods".to_string(),
291 "GET, POST, OPTIONS".to_string(),
292 );
293 self.headers.insert(
294 "access-control-allow-headers".to_string(),
295 "Content-Type, Authorization".to_string(),
296 );
297 self
298 }
299}
300
301#[derive(Debug)]
307pub enum HttpError {
308 InvalidMethod(String),
310 InvalidContentType(String),
312 HeadersTooLarge { size: usize, max: usize },
314 BodyTooLarge { size: usize, max: usize },
316 UnsupportedTransferEncoding(String),
318 JsonError(serde_json::Error),
320 CodecError(CodecError),
322 Timeout,
324 Closed,
326 Transport(TransportError),
328}
329
330impl std::fmt::Display for HttpError {
331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 match self {
333 Self::InvalidMethod(m) => write!(f, "invalid HTTP method: {}", m),
334 Self::InvalidContentType(ct) => write!(f, "invalid content type: {}", ct),
335 Self::HeadersTooLarge { size, max } => {
336 write!(f, "headers too large: {size} > {max} bytes")
337 }
338 Self::BodyTooLarge { size, max } => write!(f, "body too large: {size} > {max} bytes"),
339 Self::UnsupportedTransferEncoding(te) => {
340 write!(f, "unsupported transfer encoding: {}", te)
341 }
342 Self::JsonError(e) => write!(f, "JSON error: {}", e),
343 Self::CodecError(e) => write!(f, "codec error: {}", e),
344 Self::Timeout => write!(f, "request timeout"),
345 Self::Closed => write!(f, "connection closed"),
346 Self::Transport(e) => write!(f, "transport error: {}", e),
347 }
348 }
349}
350
351impl std::error::Error for HttpError {}
352
353impl From<serde_json::Error> for HttpError {
354 fn from(err: serde_json::Error) -> Self {
355 Self::JsonError(err)
356 }
357}
358
359impl From<CodecError> for HttpError {
360 fn from(err: CodecError) -> Self {
361 Self::CodecError(err)
362 }
363}
364
365impl From<TransportError> for HttpError {
366 fn from(err: TransportError) -> Self {
367 Self::Transport(err)
368 }
369}
370
371#[derive(Debug, Clone)]
377pub struct HttpHandlerConfig {
378 pub base_path: String,
380 pub allow_cors: bool,
382 pub cors_origins: Vec<String>,
384 pub timeout: Duration,
386 pub max_body_size: usize,
388}
389
390impl Default for HttpHandlerConfig {
391 fn default() -> Self {
392 Self {
393 base_path: "/mcp/v1".to_string(),
394 allow_cors: true,
395 cors_origins: vec!["*".to_string()],
396 timeout: Duration::from_secs(30),
397 max_body_size: 10 * 1024 * 1024, }
399 }
400}
401
402pub struct HttpRequestHandler {
408 config: HttpHandlerConfig,
409 codec: Codec,
410}
411
412impl HttpRequestHandler {
413 #[must_use]
415 pub fn new() -> Self {
416 Self::with_config(HttpHandlerConfig::default())
417 }
418
419 #[must_use]
421 pub fn with_config(config: HttpHandlerConfig) -> Self {
422 Self {
423 config,
424 codec: Codec::new(),
425 }
426 }
427
428 #[must_use]
430 pub fn config(&self) -> &HttpHandlerConfig {
431 &self.config
432 }
433
434 #[must_use]
436 pub fn handle_options(&self, request: &HttpRequest) -> HttpResponse {
437 if !self.config.allow_cors {
438 return HttpResponse::new(HttpStatus::METHOD_NOT_ALLOWED);
439 }
440
441 let origin = request.header("origin").unwrap_or("*");
442 let allowed = self.is_origin_allowed(origin);
443
444 if !allowed {
445 return HttpResponse::new(HttpStatus::FORBIDDEN);
446 }
447
448 HttpResponse::new(HttpStatus::OK)
449 .with_cors(origin)
450 .with_header("access-control-max-age", "86400")
451 }
452
453 #[must_use]
455 pub fn is_origin_allowed(&self, origin: &str) -> bool {
456 self.config
457 .cors_origins
458 .iter()
459 .any(|o| o == "*" || o == origin)
460 }
461
462 pub fn parse_request(&self, request: &HttpRequest) -> Result<JsonRpcRequest, HttpError> {
464 if request.method != HttpMethod::Post {
466 return Err(HttpError::InvalidMethod(
467 request.method.as_str().to_string(),
468 ));
469 }
470
471 let content_type = request.content_type().unwrap_or("");
473 if !content_type.starts_with("application/json") {
474 return Err(HttpError::InvalidContentType(content_type.to_string()));
475 }
476
477 if request.body.len() > self.config.max_body_size {
479 return Err(HttpError::BodyTooLarge {
480 size: request.body.len(),
481 max: self.config.max_body_size,
482 });
483 }
484
485 let json_rpc: JsonRpcRequest = serde_json::from_slice(&request.body)?;
487 Ok(json_rpc)
488 }
489
490 #[must_use]
492 pub fn create_response(
493 &self,
494 response: &JsonRpcResponse,
495 origin: Option<&str>,
496 ) -> HttpResponse {
497 let body = self.codec.encode_response(response).unwrap_or_default();
498
499 let mut http_response = HttpResponse::ok()
500 .with_body(body)
501 .with_header("content-type", "application/json");
502
503 if self.config.allow_cors {
504 if let Some(origin) = origin {
505 if self.is_origin_allowed(origin) {
506 http_response = http_response.with_cors(origin);
507 }
508 }
509 }
510
511 http_response
512 }
513
514 #[must_use]
516 pub fn error_response(&self, status: HttpStatus, message: &str) -> HttpResponse {
517 let error = serde_json::json!({
518 "error": {
519 "code": -32600,
520 "message": message
521 }
522 });
523
524 HttpResponse::new(status).with_json(&error)
525 }
526}
527
528impl Default for HttpRequestHandler {
529 fn default() -> Self {
530 Self::new()
531 }
532}
533
534pub struct HttpTransport<R, W> {
544 reader: R,
545 writer: W,
546 codec: Codec,
547 closed: bool,
548 pending_responses: Vec<JsonRpcResponse>,
549}
550
551impl<R: Read, W: Write> HttpTransport<R, W> {
552 #[must_use]
554 pub fn new(reader: R, writer: W) -> Self {
555 Self {
556 reader,
557 writer,
558 codec: Codec::new(),
559 closed: false,
560 pending_responses: Vec::new(),
561 }
562 }
563
564 pub fn read_request(&mut self) -> Result<HttpRequest, HttpError> {
566 const MAX_HEADERS_SIZE: usize = 64 * 1024;
567 const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
568
569 let mut buffer = Vec::new();
570 let mut byte = [0u8; 1];
571
572 loop {
574 if self
575 .reader
576 .read(&mut byte)
577 .map_err(|e| HttpError::Transport(e.into()))?
578 == 0
579 {
580 return Err(HttpError::Closed);
581 }
582 buffer.push(byte[0]);
583
584 if buffer.ends_with(b"\r\n\r\n") {
585 break;
586 }
587
588 if buffer.len() > MAX_HEADERS_SIZE {
590 return Err(HttpError::HeadersTooLarge {
591 size: buffer.len(),
592 max: MAX_HEADERS_SIZE,
593 });
594 }
595 }
596
597 let header_str = String::from_utf8_lossy(&buffer);
598 let mut lines = header_str.lines();
599
600 let request_line = lines
602 .next()
603 .ok_or_else(|| HttpError::InvalidMethod("missing request line".to_string()))?;
604
605 let parts: Vec<&str> = request_line.split_whitespace().collect();
606 if parts.len() < 2 {
607 return Err(HttpError::InvalidMethod("invalid request line".to_string()));
608 }
609
610 let method = HttpMethod::parse(parts[0])
611 .ok_or_else(|| HttpError::InvalidMethod(parts[0].to_string()))?;
612
613 let full_path = parts[1];
614 let (path, query_str) = full_path
615 .split_once('?')
616 .map_or((full_path.to_string(), None), |(p, q)| {
617 (p.to_string(), Some(q))
618 });
619
620 let mut query = HashMap::new();
621 if let Some(qs) = query_str {
622 for pair in qs.split('&') {
623 if pair.is_empty() {
624 continue;
625 }
626 let (k, v) = pair.split_once('=').unwrap_or((pair, ""));
627 query.insert(k.to_string(), v.to_string());
628 }
629 }
630
631 let mut headers = HashMap::new();
633 for line in lines {
634 if line.is_empty() {
635 break;
636 }
637 if let Some((name, value)) = line.split_once(':') {
638 headers.insert(name.trim().to_lowercase(), value.trim().to_string());
639 }
640 }
641
642 let mut body = Vec::new();
647
648 if let Some(te) = headers.get("transfer-encoding") {
649 if te.to_ascii_lowercase().contains("chunked") {
650 loop {
652 let mut line = Vec::new();
654 loop {
655 if self
656 .reader
657 .read(&mut byte)
658 .map_err(|e| HttpError::Transport(e.into()))?
659 == 0
660 {
661 return Err(HttpError::Closed);
662 }
663 line.push(byte[0]);
664 if line.ends_with(b"\r\n") {
665 break;
666 }
667 if line.len() > 1024 {
668 return Err(HttpError::InvalidMethod(
669 "invalid chunk size line".to_string(),
670 ));
671 }
672 }
673
674 let line_str = String::from_utf8_lossy(&line);
675 let size_str = line_str.trim().split(';').next().unwrap_or("");
676 let size = usize::from_str_radix(size_str, 16)
677 .map_err(|_| HttpError::InvalidMethod("invalid chunk size".to_string()))?;
678
679 if size == 0 {
680 let mut trailer = Vec::new();
682 loop {
683 trailer.clear();
684 loop {
685 if self
686 .reader
687 .read(&mut byte)
688 .map_err(|e| HttpError::Transport(e.into()))?
689 == 0
690 {
691 return Err(HttpError::Closed);
692 }
693 trailer.push(byte[0]);
694 if trailer.ends_with(b"\r\n") {
695 break;
696 }
697 if trailer.len() > MAX_HEADERS_SIZE {
698 return Err(HttpError::HeadersTooLarge {
699 size: trailer.len(),
700 max: MAX_HEADERS_SIZE,
701 });
702 }
703 }
704 if trailer == b"\r\n" {
705 break;
706 }
707 }
708 break;
709 }
710
711 let mut chunk = vec![0u8; size];
712 self.reader
713 .read_exact(&mut chunk)
714 .map_err(|e| HttpError::Transport(e.into()))?;
715 body.extend_from_slice(&chunk);
716 if body.len() > MAX_BODY_SIZE {
717 return Err(HttpError::BodyTooLarge {
718 size: body.len(),
719 max: MAX_BODY_SIZE,
720 });
721 }
722
723 let mut crlf = [0u8; 2];
725 self.reader
726 .read_exact(&mut crlf)
727 .map_err(|e| HttpError::Transport(e.into()))?;
728 if &crlf != b"\r\n" {
729 return Err(HttpError::InvalidMethod(
730 "invalid chunk terminator".to_string(),
731 ));
732 }
733 }
734 } else {
735 return Err(HttpError::UnsupportedTransferEncoding(te.clone()));
736 }
737 } else {
738 let content_length: usize = headers
740 .get("content-length")
741 .and_then(|s| s.parse().ok())
742 .unwrap_or(0);
743
744 if content_length > MAX_BODY_SIZE {
745 return Err(HttpError::BodyTooLarge {
746 size: content_length,
747 max: MAX_BODY_SIZE,
748 });
749 }
750
751 body.resize(content_length, 0);
752 if content_length > 0 {
753 self.reader
754 .read_exact(&mut body)
755 .map_err(|e| HttpError::Transport(e.into()))?;
756 }
757 }
758
759 Ok(HttpRequest {
760 method,
761 path,
762 headers,
763 body,
764 query,
765 })
766 }
767
768 pub fn write_response(&mut self, response: &HttpResponse) -> Result<(), HttpError> {
770 let status_text = match response.status.0 {
771 200 => "OK",
772 400 => "Bad Request",
773 401 => "Unauthorized",
774 403 => "Forbidden",
775 404 => "Not Found",
776 500 => "Internal Server Error",
777 _ => "Unknown",
778 };
779
780 write!(
782 self.writer,
783 "HTTP/1.1 {} {}\r\n",
784 response.status.0, status_text
785 )
786 .map_err(|e| HttpError::Transport(e.into()))?;
787
788 for (name, value) in &response.headers {
790 write!(self.writer, "{}: {}\r\n", name, value)
791 .map_err(|e| HttpError::Transport(e.into()))?;
792 }
793
794 if !response.headers.contains_key("content-length") {
796 write!(self.writer, "content-length: {}\r\n", response.body.len())
797 .map_err(|e| HttpError::Transport(e.into()))?;
798 }
799
800 write!(self.writer, "\r\n").map_err(|e| HttpError::Transport(e.into()))?;
802
803 self.writer
805 .write_all(&response.body)
806 .map_err(|e| HttpError::Transport(e.into()))?;
807 self.writer
808 .flush()
809 .map_err(|e| HttpError::Transport(e.into()))?;
810
811 Ok(())
812 }
813
814 pub fn queue_response(&mut self, response: JsonRpcResponse) {
816 self.pending_responses.push(response);
817 }
818}
819
820impl<R: Read, W: Write> Transport for HttpTransport<R, W> {
821 fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
822 if cx.is_cancel_requested() {
823 return Err(TransportError::Cancelled);
824 }
825
826 if self.closed {
827 return Err(TransportError::Closed);
828 }
829
830 let response = match message {
831 JsonRpcMessage::Response(r) => r.clone(),
832 JsonRpcMessage::Request(r) => {
833 let _ = r;
839 return Err(TransportError::Io(std::io::Error::other(
840 "HttpTransport cannot send server-to-client requests",
841 )));
842 }
843 };
844
845 let http_response = HttpResponse::ok().with_json(&response);
846
847 self.write_response(&http_response)
848 .map_err(|_| TransportError::Io(std::io::Error::other("write error")))?;
849
850 Ok(())
851 }
852
853 fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
854 if cx.is_cancel_requested() {
855 return Err(TransportError::Cancelled);
856 }
857
858 if self.closed {
859 return Err(TransportError::Closed);
860 }
861
862 let http_request = self.read_request().map_err(|e| match e {
863 HttpError::Closed => TransportError::Closed,
864 HttpError::Timeout => TransportError::Timeout,
865 _ => TransportError::Io(std::io::Error::other(e.to_string())),
866 })?;
867
868 let json_rpc: JsonRpcRequest = serde_json::from_slice(&http_request.body)
870 .map_err(|e| TransportError::Codec(CodecError::Json(e)))?;
871
872 Ok(JsonRpcMessage::Request(json_rpc))
873 }
874
875 fn close(&mut self) -> Result<(), TransportError> {
876 self.closed = true;
877 Ok(())
878 }
879}
880
881pub struct StreamableHttpTransport {
891 requests: Arc<Mutex<VecDeque<JsonRpcRequest>>>,
893 responses: Arc<Mutex<VecDeque<JsonRpcResponse>>>,
895 codec: Codec,
897 closed: bool,
899 poll_interval: Duration,
901}
902
903impl StreamableHttpTransport {
904 #[must_use]
906 pub fn new() -> Self {
907 Self {
908 requests: Arc::new(Mutex::new(VecDeque::new())),
909 responses: Arc::new(Mutex::new(VecDeque::new())),
910 codec: Codec::new(),
911 closed: false,
912 poll_interval: Duration::from_millis(10),
913 }
914 }
915
916 pub fn push_request(&self, request: JsonRpcRequest) {
918 let mut guard = match self.requests.lock() {
919 Ok(guard) => guard,
920 Err(poisoned) => poisoned.into_inner(),
921 };
922 guard.push_back(request);
923 }
924
925 #[must_use]
927 pub fn pop_response(&self) -> Option<JsonRpcResponse> {
928 let mut guard = match self.responses.lock() {
929 Ok(guard) => guard,
930 Err(poisoned) => poisoned.into_inner(),
931 };
932 guard.pop_front()
933 }
934
935 #[must_use]
937 pub fn has_responses(&self) -> bool {
938 match self.responses.lock() {
939 Ok(guard) => !guard.is_empty(),
940 Err(poisoned) => !poisoned.into_inner().is_empty(),
941 }
942 }
943
944 #[must_use]
946 pub fn request_queue(&self) -> Arc<Mutex<VecDeque<JsonRpcRequest>>> {
947 Arc::clone(&self.requests)
948 }
949
950 #[must_use]
952 pub fn response_queue(&self) -> Arc<Mutex<VecDeque<JsonRpcResponse>>> {
953 Arc::clone(&self.responses)
954 }
955}
956
957impl Default for StreamableHttpTransport {
958 fn default() -> Self {
959 Self::new()
960 }
961}
962
963impl Transport for StreamableHttpTransport {
964 fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
965 if cx.is_cancel_requested() {
966 return Err(TransportError::Cancelled);
967 }
968
969 if self.closed {
970 return Err(TransportError::Closed);
971 }
972
973 match message {
974 JsonRpcMessage::Response(response) => {
975 let mut guard = self.responses.lock().map_err(|_| {
976 TransportError::Io(std::io::Error::other(
977 "streamable response queue lock poisoned",
978 ))
979 })?;
980 guard.push_back(response.clone());
981 }
982 JsonRpcMessage::Request(_) => {
983 return Err(TransportError::Io(std::io::Error::other(
988 "StreamableHttpTransport cannot send server-to-client requests",
989 )));
990 }
991 }
992
993 Ok(())
994 }
995
996 fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
997 if cx.is_cancel_requested() {
998 return Err(TransportError::Cancelled);
999 }
1000
1001 if self.closed {
1002 return Err(TransportError::Closed);
1003 }
1004
1005 loop {
1007 if cx.is_cancel_requested() {
1008 return Err(TransportError::Cancelled);
1009 }
1010
1011 let mut guard = self.requests.lock().map_err(|_| {
1012 TransportError::Io(std::io::Error::other(
1013 "streamable request queue lock poisoned",
1014 ))
1015 })?;
1016 if let Some(request) = guard.pop_front() {
1017 return Ok(JsonRpcMessage::Request(request));
1018 }
1019 drop(guard);
1020
1021 std::thread::sleep(self.poll_interval);
1023 }
1024 }
1025
1026 fn close(&mut self) -> Result<(), TransportError> {
1027 self.closed = true;
1028 Ok(())
1029 }
1030}
1031
1032#[derive(Debug, Clone)]
1038pub struct HttpSession {
1039 pub id: String,
1041 pub created_at: Instant,
1043 pub last_activity: Instant,
1045 pub data: HashMap<String, serde_json::Value>,
1047}
1048
1049impl HttpSession {
1050 #[must_use]
1052 pub fn new(id: impl Into<String>) -> Self {
1053 let now = Instant::now();
1054 Self {
1055 id: id.into(),
1056 created_at: now,
1057 last_activity: now,
1058 data: HashMap::new(),
1059 }
1060 }
1061
1062 pub fn touch(&mut self) {
1064 self.last_activity = Instant::now();
1065 }
1066
1067 #[must_use]
1069 pub fn is_expired(&self, timeout: Duration) -> bool {
1070 self.last_activity.elapsed() > timeout
1071 }
1072
1073 #[must_use]
1075 pub fn get(&self, key: &str) -> Option<&serde_json::Value> {
1076 self.data.get(key)
1077 }
1078
1079 pub fn set(&mut self, key: impl Into<String>, value: serde_json::Value) {
1081 self.data.insert(key.into(), value);
1082 self.touch();
1083 }
1084
1085 pub fn remove(&mut self, key: &str) -> Option<serde_json::Value> {
1087 self.touch();
1088 self.data.remove(key)
1089 }
1090}
1091
1092#[derive(Debug, Default)]
1094pub struct SessionStore {
1095 sessions: Mutex<HashMap<String, HttpSession>>,
1096 timeout: Duration,
1097}
1098
1099impl SessionStore {
1100 #[must_use]
1102 pub fn new(timeout: Duration) -> Self {
1103 Self {
1104 sessions: Mutex::new(HashMap::new()),
1105 timeout,
1106 }
1107 }
1108
1109 #[must_use]
1111 pub fn with_defaults() -> Self {
1112 Self::new(Duration::from_secs(3600))
1113 }
1114
1115 #[must_use]
1117 pub fn create(&self) -> String {
1118 let id = generate_session_id();
1119 let session = HttpSession::new(&id);
1120
1121 if let Ok(mut guard) = self.sessions.lock() {
1122 guard.insert(id.clone(), session);
1123 }
1124
1125 id
1126 }
1127
1128 #[must_use]
1130 pub fn get(&self, id: &str) -> Option<HttpSession> {
1131 let mut guard = self.sessions.lock().ok()?;
1132 let session = guard.get_mut(id)?;
1133
1134 if session.is_expired(self.timeout) {
1135 guard.remove(id);
1136 return None;
1137 }
1138
1139 session.touch();
1140 Some(session.clone())
1141 }
1142
1143 pub fn update(&self, session: HttpSession) {
1145 if let Ok(mut guard) = self.sessions.lock() {
1146 guard.insert(session.id.clone(), session);
1147 }
1148 }
1149
1150 pub fn remove(&self, id: &str) {
1152 if let Ok(mut guard) = self.sessions.lock() {
1153 guard.remove(id);
1154 }
1155 }
1156
1157 pub fn cleanup(&self) {
1159 if let Ok(mut guard) = self.sessions.lock() {
1160 guard.retain(|_, s| !s.is_expired(self.timeout));
1161 }
1162 }
1163
1164 #[must_use]
1166 pub fn count(&self) -> usize {
1167 self.sessions.lock().map(|g| g.len()).unwrap_or(0)
1168 }
1169}
1170
1171fn generate_session_id() -> String {
1173 use std::collections::hash_map::RandomState;
1174 use std::hash::{BuildHasher, Hasher};
1175 use std::time::{SystemTime, UNIX_EPOCH};
1176
1177 let state = RandomState::new();
1178 let mut hasher = state.build_hasher();
1179 hasher.write_u128(
1180 SystemTime::now()
1181 .duration_since(UNIX_EPOCH)
1182 .unwrap_or_default()
1183 .as_nanos(),
1184 );
1185
1186 format!("{:016x}", hasher.finish())
1187}
1188
1189#[cfg(test)]
1194mod tests {
1195 use super::*;
1196
1197 #[test]
1198 fn test_http_method_parse() {
1199 assert_eq!(HttpMethod::parse("GET"), Some(HttpMethod::Get));
1200 assert_eq!(HttpMethod::parse("POST"), Some(HttpMethod::Post));
1201 assert_eq!(HttpMethod::parse("get"), Some(HttpMethod::Get));
1202 assert_eq!(HttpMethod::parse("INVALID"), None);
1203 }
1204
1205 #[test]
1206 fn test_http_status() {
1207 assert!(HttpStatus::OK.is_success());
1208 assert!(HttpStatus::BAD_REQUEST.is_client_error());
1209 assert!(HttpStatus::INTERNAL_SERVER_ERROR.is_server_error());
1210 }
1211
1212 #[test]
1213 fn test_http_request_builder() {
1214 let request = HttpRequest::new(HttpMethod::Post, "/api/mcp")
1215 .with_header("Content-Type", "application/json")
1216 .with_body(b"{\"test\": true}".to_vec())
1217 .with_query("version", "1");
1218
1219 assert_eq!(request.method, HttpMethod::Post);
1220 assert_eq!(request.path, "/api/mcp");
1221 assert_eq!(request.header("content-type"), Some("application/json"));
1222 assert_eq!(request.query.get("version"), Some(&"1".to_string()));
1223 }
1224
1225 #[test]
1226 fn test_http_response_builder() {
1227 let response = HttpResponse::ok()
1228 .with_header("X-Custom", "value")
1229 .with_body(b"Hello".to_vec());
1230
1231 assert_eq!(response.status, HttpStatus::OK);
1232 assert_eq!(response.headers.get("x-custom"), Some(&"value".to_string()));
1233 assert_eq!(response.body, b"Hello");
1234 }
1235
1236 #[test]
1237 fn test_http_response_json() {
1238 let data = serde_json::json!({"result": "ok"});
1239 let response = HttpResponse::ok().with_json(&data);
1240
1241 assert!(!response.body.is_empty());
1242 assert_eq!(
1243 response.headers.get("content-type"),
1244 Some(&"application/json".to_string())
1245 );
1246 }
1247
1248 #[test]
1249 fn test_http_response_cors() {
1250 let response = HttpResponse::ok().with_cors("https://example.com");
1251
1252 assert_eq!(
1253 response.headers.get("access-control-allow-origin"),
1254 Some(&"https://example.com".to_string())
1255 );
1256 }
1257
1258 #[test]
1259 fn test_http_handler_options() {
1260 let handler = HttpRequestHandler::new();
1261 let request = HttpRequest::new(HttpMethod::Options, "/mcp/v1")
1262 .with_header("Origin", "https://example.com");
1263
1264 let response = handler.handle_options(&request);
1265 assert_eq!(response.status, HttpStatus::OK);
1266 }
1267
1268 #[test]
1269 fn test_http_handler_parse_request() {
1270 let handler = HttpRequestHandler::new();
1271
1272 let json_rpc = serde_json::json!({
1274 "jsonrpc": "2.0",
1275 "method": "test",
1276 "id": 1
1277 });
1278 let request = HttpRequest::new(HttpMethod::Post, "/mcp/v1")
1279 .with_header("Content-Type", "application/json")
1280 .with_body(serde_json::to_vec(&json_rpc).unwrap());
1281
1282 let result = handler.parse_request(&request);
1283 assert!(result.is_ok());
1284
1285 let request = HttpRequest::new(HttpMethod::Get, "/mcp/v1");
1287 assert!(handler.parse_request(&request).is_err());
1288
1289 let request =
1291 HttpRequest::new(HttpMethod::Post, "/mcp/v1").with_header("Content-Type", "text/plain");
1292 assert!(handler.parse_request(&request).is_err());
1293 }
1294
1295 #[test]
1296 fn test_http_session() {
1297 let mut session = HttpSession::new("test-session");
1298 assert_eq!(session.id, "test-session");
1299
1300 session.set("key", serde_json::json!("value"));
1301 assert_eq!(session.get("key"), Some(&serde_json::json!("value")));
1302
1303 session.remove("key");
1304 assert!(session.get("key").is_none());
1305
1306 assert!(!session.is_expired(Duration::from_secs(3600)));
1307 }
1308
1309 #[test]
1310 fn test_session_store() {
1311 let store = SessionStore::with_defaults();
1312
1313 let id = store.create();
1314 assert!(!id.is_empty());
1315
1316 let session = store.get(&id);
1317 assert!(session.is_some());
1318
1319 store.remove(&id);
1320 assert!(store.get(&id).is_none());
1321 }
1322
1323 #[test]
1324 fn test_streamable_transport() {
1325 let transport = StreamableHttpTransport::new();
1326
1327 let request = JsonRpcRequest::new("test", None, 1i64);
1329 transport.push_request(request);
1330
1331 let guard = transport.requests.lock().unwrap();
1333 assert_eq!(guard.len(), 1);
1334 }
1335
1336 #[test]
1337 fn test_http_error_display() {
1338 let err = HttpError::InvalidMethod("PATCH".to_string());
1339 assert!(err.to_string().contains("PATCH"));
1340
1341 let err = HttpError::Timeout;
1342 assert!(err.to_string().contains("timeout"));
1343 }
1344
1345 #[test]
1346 fn test_generate_session_id() {
1347 let id1 = generate_session_id();
1348 let id2 = generate_session_id();
1349
1350 assert_ne!(id1, id2);
1351 assert_eq!(id1.len(), 16);
1352 }
1353
1354 #[test]
1355 fn test_http_transport_read_request_chunked_body_and_query() {
1356 use std::io::Cursor;
1357
1358 let body = br#"{"jsonrpc":"2.0","method":"test","id":1}"#;
1359 let body1 = &body[..10];
1360 let body2 = &body[10..];
1361
1362 let raw = format!(
1363 "POST /mcp/v1?foo=bar&x=y HTTP/1.1\r\n\
1364Host: example.com\r\n\
1365Content-Type: application/json\r\n\
1366Transfer-Encoding: chunked\r\n\
1367\r\n\
1368{:x}\r\n\
1369{}\r\n\
1370{:x}\r\n\
1371{}\r\n\
13720\r\n\
1373\r\n",
1374 body1.len(),
1375 std::str::from_utf8(body1).unwrap(),
1376 body2.len(),
1377 std::str::from_utf8(body2).unwrap(),
1378 );
1379
1380 let reader = Cursor::new(raw.into_bytes());
1381 let mut output = Vec::new();
1382 let mut transport = HttpTransport::new(reader, &mut output);
1383
1384 let req = transport.read_request().unwrap();
1385 assert_eq!(req.method, HttpMethod::Post);
1386 assert_eq!(req.path, "/mcp/v1");
1387 assert_eq!(req.query.get("foo"), Some(&"bar".to_string()));
1388 assert_eq!(req.query.get("x"), Some(&"y".to_string()));
1389 assert_eq!(req.body, body);
1390 }
1391
1392 #[test]
1397 fn e2e_http_request_response_flow() {
1398 use fastmcp_protocol::RequestId;
1399 use std::io::Cursor;
1400
1401 let json_rpc_request = serde_json::json!({
1403 "jsonrpc": "2.0",
1404 "method": "tools/list",
1405 "id": 1
1406 });
1407 let body = serde_json::to_vec(&json_rpc_request).unwrap();
1408
1409 let http_request = format!(
1410 "POST /mcp/v1 HTTP/1.1\r\n\
1411 Content-Type: application/json\r\n\
1412 Content-Length: {}\r\n\
1413 \r\n",
1414 body.len()
1415 );
1416
1417 let mut input = http_request.into_bytes();
1418 input.extend(body);
1419
1420 let reader = Cursor::new(input);
1421 let mut output = Vec::new();
1422
1423 let cx = Cx::for_testing();
1424
1425 {
1426 let mut transport = HttpTransport::new(reader, &mut output);
1427
1428 let msg = transport.recv(&cx).unwrap();
1430 assert!(
1431 matches!(msg, JsonRpcMessage::Request(_)),
1432 "Expected request"
1433 );
1434 let JsonRpcMessage::Request(req) = msg else {
1435 return;
1436 };
1437
1438 assert_eq!(req.method, "tools/list");
1439 assert_eq!(req.id, Some(RequestId::Number(1)));
1440
1441 let response = JsonRpcResponse {
1443 jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1444 result: Some(serde_json::json!({"tools": []})),
1445 error: None,
1446 id: Some(RequestId::Number(1)),
1447 };
1448 transport
1449 .send(&cx, &JsonRpcMessage::Response(response))
1450 .unwrap();
1451 }
1452
1453 let response_str = String::from_utf8(output).unwrap();
1455 assert!(response_str.starts_with("HTTP/1.1 200 OK\r\n"));
1456 assert!(response_str.contains("content-type: application/json"));
1457 assert!(response_str.contains("\"tools\":[]"));
1458 }
1459
1460 #[test]
1461 fn e2e_http_error_status_codes() {
1462 let handler = HttpRequestHandler::new();
1463
1464 let request = HttpRequest::new(HttpMethod::Get, "/mcp/v1")
1466 .with_header("Content-Type", "application/json");
1467 let result = handler.parse_request(&request);
1468 assert!(matches!(result, Err(HttpError::InvalidMethod(_))));
1469
1470 let request =
1472 HttpRequest::new(HttpMethod::Post, "/mcp/v1").with_header("Content-Type", "text/xml");
1473 let result = handler.parse_request(&request);
1474 assert!(matches!(result, Err(HttpError::InvalidContentType(_))));
1475
1476 let response = handler.error_response(HttpStatus::BAD_REQUEST, "Invalid request format");
1478 assert_eq!(response.status, HttpStatus::BAD_REQUEST);
1479 let body_str = String::from_utf8(response.body).unwrap();
1480 assert!(body_str.contains("\"error\""));
1481 }
1482
1483 #[test]
1484 fn e2e_http_content_type_handling() {
1485 let handler = HttpRequestHandler::new();
1486
1487 let request = HttpRequest::new(HttpMethod::Post, "/mcp/v1")
1489 .with_header("Content-Type", "application/json")
1490 .with_body(r#"{"jsonrpc":"2.0","method":"test","id":1}"#);
1491 assert!(handler.parse_request(&request).is_ok());
1492
1493 let request = HttpRequest::new(HttpMethod::Post, "/mcp/v1")
1495 .with_header("Content-Type", "application/json; charset=utf-8")
1496 .with_body(r#"{"jsonrpc":"2.0","method":"test","id":1}"#);
1497 assert!(handler.parse_request(&request).is_ok());
1498
1499 let response = JsonRpcResponse {
1501 jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1502 result: Some(serde_json::json!({})),
1503 error: None,
1504 id: Some(fastmcp_protocol::RequestId::Number(1)),
1505 };
1506 let http_response = handler.create_response(&response, None);
1507 assert_eq!(
1508 http_response.headers.get("content-type"),
1509 Some(&"application/json".to_string())
1510 );
1511 }
1512
1513 #[test]
1514 fn e2e_http_cors_handling() {
1515 let config = HttpHandlerConfig {
1516 allow_cors: true,
1517 cors_origins: vec!["https://allowed.com".to_string()],
1518 ..Default::default()
1519 };
1520 let handler = HttpRequestHandler::with_config(config);
1521
1522 assert!(handler.is_origin_allowed("https://allowed.com"));
1524
1525 assert!(!handler.is_origin_allowed("https://evil.com"));
1527
1528 let request = HttpRequest::new(HttpMethod::Options, "/mcp/v1")
1530 .with_header("Origin", "https://allowed.com");
1531 let response = handler.handle_options(&request);
1532 assert_eq!(response.status, HttpStatus::OK);
1533 assert_eq!(
1534 response.headers.get("access-control-allow-origin"),
1535 Some(&"https://allowed.com".to_string())
1536 );
1537
1538 let request = HttpRequest::new(HttpMethod::Options, "/mcp/v1")
1540 .with_header("Origin", "https://evil.com");
1541 let response = handler.handle_options(&request);
1542 assert_eq!(response.status, HttpStatus::FORBIDDEN);
1543 }
1544
1545 #[test]
1546 fn e2e_http_streaming_transport() {
1547 use fastmcp_protocol::RequestId;
1548
1549 let mut transport = StreamableHttpTransport::new();
1550 let cx = Cx::for_testing();
1551
1552 let req1 = JsonRpcRequest::new("method1", None, 1i64);
1554 let req2 = JsonRpcRequest::new("method2", None, 2i64);
1555 transport.push_request(req1);
1556 transport.push_request(req2);
1557
1558 let msg = transport.recv(&cx).unwrap();
1560 if let JsonRpcMessage::Request(req) = msg {
1561 assert_eq!(req.method, "method1");
1562 }
1563
1564 let response = JsonRpcResponse {
1566 jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1567 result: Some(serde_json::json!({})),
1568 error: None,
1569 id: Some(RequestId::Number(2)),
1570 };
1571 transport
1572 .send(&cx, &JsonRpcMessage::Response(response))
1573 .unwrap();
1574
1575 assert!(transport.has_responses());
1577 let resp = transport.pop_response().unwrap();
1578 assert_eq!(resp.id, Some(RequestId::Number(2)));
1579 }
1580
1581 #[test]
1582 fn e2e_http_streaming_response_queue_is_fifo() {
1583 use fastmcp_protocol::RequestId;
1584
1585 let mut transport = StreamableHttpTransport::new();
1586 let cx = Cx::for_testing();
1587
1588 let first = JsonRpcResponse {
1589 jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1590 result: Some(serde_json::json!({"seq": 1})),
1591 error: None,
1592 id: Some(RequestId::Number(1)),
1593 };
1594 let second = JsonRpcResponse {
1595 jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1596 result: Some(serde_json::json!({"seq": 2})),
1597 error: None,
1598 id: Some(RequestId::Number(2)),
1599 };
1600
1601 transport
1602 .send(&cx, &JsonRpcMessage::Response(first))
1603 .unwrap();
1604 transport
1605 .send(&cx, &JsonRpcMessage::Response(second))
1606 .unwrap();
1607
1608 let first_out = transport.pop_response().expect("first response");
1609 let second_out = transport.pop_response().expect("second response");
1610 assert_eq!(first_out.id, Some(RequestId::Number(1)));
1611 assert_eq!(second_out.id, Some(RequestId::Number(2)));
1612 }
1613
1614 #[test]
1615 fn e2e_http_streaming_rejects_server_to_client_requests() {
1616 let mut transport = StreamableHttpTransport::new();
1617 let cx = Cx::for_testing();
1618 let request = JsonRpcRequest::notification("notifications/message", None);
1619
1620 let err = transport
1621 .send(&cx, &JsonRpcMessage::Request(request))
1622 .expect_err("streamable transport must reject server-to-client requests");
1623
1624 assert!(matches!(err, TransportError::Io(_)));
1625 }
1626
1627 #[test]
1628 fn e2e_http_streaming_send_fails_when_response_queue_poisoned() {
1629 use fastmcp_protocol::RequestId;
1630 use std::thread;
1631
1632 let mut transport = StreamableHttpTransport::new();
1633 let queue = transport.response_queue();
1634 let poison = thread::spawn(move || {
1635 let _guard = queue.lock().expect("lock response queue");
1636 std::panic::panic_any("poison response queue");
1637 });
1638 assert!(poison.join().is_err());
1639
1640 let cx = Cx::for_testing();
1641 let response = JsonRpcResponse {
1642 jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1643 result: Some(serde_json::json!({"ok": true})),
1644 error: None,
1645 id: Some(RequestId::Number(1)),
1646 };
1647
1648 let err = transport
1649 .send(&cx, &JsonRpcMessage::Response(response))
1650 .expect_err("poisoned response queue must fail send");
1651 assert!(matches!(err, TransportError::Io(_)));
1652 }
1653
1654 #[test]
1655 fn e2e_http_streaming_recv_fails_when_request_queue_poisoned() {
1656 use std::thread;
1657
1658 let mut transport = StreamableHttpTransport::new();
1659 let queue = transport.request_queue();
1660 let poison = thread::spawn(move || {
1661 let _guard = queue.lock().expect("lock request queue");
1662 std::panic::panic_any("poison request queue");
1663 });
1664 assert!(poison.join().is_err());
1665
1666 let cx = Cx::for_testing();
1667 let err = transport
1668 .recv(&cx)
1669 .expect_err("poisoned request queue must fail recv");
1670 assert!(matches!(err, TransportError::Io(_)));
1671 }
1672
1673 #[test]
1674 fn e2e_http_streaming_push_request_recovers_from_poisoned_queue() {
1675 use std::thread;
1676
1677 let transport = StreamableHttpTransport::new();
1678 let queue = transport.request_queue();
1679 let poison_queue = Arc::clone(&queue);
1680 let poison = thread::spawn(move || {
1681 let _guard = poison_queue.lock().expect("lock request queue");
1682 std::panic::panic_any("poison request queue");
1683 });
1684 assert!(poison.join().is_err());
1685
1686 transport.push_request(JsonRpcRequest::new("recovered-method", None, 7i64));
1687
1688 let guard = match queue.lock() {
1689 Ok(guard) => guard,
1690 Err(poisoned) => poisoned.into_inner(),
1691 };
1692 assert_eq!(guard.len(), 1);
1693 assert_eq!(guard[0].method, "recovered-method");
1694 }
1695
1696 #[test]
1697 fn e2e_http_streaming_response_helpers_recover_from_poisoned_queue() {
1698 use fastmcp_protocol::RequestId;
1699 use std::thread;
1700
1701 let transport = StreamableHttpTransport::new();
1702 let queue = transport.response_queue();
1703 {
1704 let mut guard = queue.lock().expect("lock response queue");
1705 guard.push_back(JsonRpcResponse {
1706 jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1707 result: Some(serde_json::json!({"seq": 9})),
1708 error: None,
1709 id: Some(RequestId::Number(9)),
1710 });
1711 }
1712
1713 let poison_queue = Arc::clone(&queue);
1714 let poison = thread::spawn(move || {
1715 let _guard = poison_queue.lock().expect("lock response queue");
1716 std::panic::panic_any("poison response queue");
1717 });
1718 assert!(poison.join().is_err());
1719
1720 assert!(transport.has_responses());
1721 let response = transport
1722 .pop_response()
1723 .expect("poisoned queue should still yield response");
1724 assert_eq!(response.id, Some(RequestId::Number(9)));
1725 }
1726
1727 #[test]
1728 fn e2e_http_session_lifecycle() {
1729 let store = SessionStore::new(Duration::from_millis(100));
1730
1731 let id = store.create();
1733 assert_eq!(store.count(), 1);
1734
1735 let mut session = store.get(&id).unwrap();
1737 session.set("user_id", serde_json::json!(42));
1738 store.update(session);
1739
1740 let session = store.get(&id).unwrap();
1742 assert_eq!(session.get("user_id"), Some(&serde_json::json!(42)));
1743
1744 std::thread::sleep(Duration::from_millis(150));
1746
1747 assert!(store.get(&id).is_none());
1749 }
1750
1751 #[test]
1752 fn e2e_http_transport_cancellation() {
1753 use std::io::Cursor;
1754
1755 let reader = Cursor::new(Vec::<u8>::new());
1756 let mut output = Vec::new();
1757
1758 let cx = Cx::for_testing();
1759 cx.set_cancel_requested(true);
1760
1761 let mut transport = HttpTransport::new(reader, &mut output);
1762
1763 let response = JsonRpcResponse {
1765 jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1766 result: None,
1767 error: None,
1768 id: None,
1769 };
1770 let result = transport.send(&cx, &JsonRpcMessage::Response(response));
1771 assert!(matches!(result, Err(TransportError::Cancelled)));
1772
1773 assert!(output.is_empty());
1775 }
1776
1777 #[test]
1778 fn e2e_http_transport_close() {
1779 use std::io::Cursor;
1780
1781 let reader = Cursor::new(Vec::<u8>::new());
1782 let mut output = Vec::new();
1783
1784 let cx = Cx::for_testing();
1785 let mut transport = HttpTransport::new(reader, &mut output);
1786
1787 transport.close().unwrap();
1789
1790 let response = JsonRpcResponse {
1792 jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1793 result: None,
1794 error: None,
1795 id: None,
1796 };
1797 let result = transport.send(&cx, &JsonRpcMessage::Response(response));
1798 assert!(matches!(result, Err(TransportError::Closed)));
1799 }
1800
1801 #[test]
1802 fn e2e_http_body_size_limit() {
1803 let config = HttpHandlerConfig {
1804 max_body_size: 100,
1805 ..Default::default()
1806 };
1807 let handler = HttpRequestHandler::with_config(config);
1808
1809 let large_body = vec![b'x'; 200];
1811 let request = HttpRequest::new(HttpMethod::Post, "/mcp/v1")
1812 .with_header("Content-Type", "application/json")
1813 .with_body(large_body);
1814
1815 let result = handler.parse_request(&request);
1816 assert!(matches!(result, Err(HttpError::BodyTooLarge { .. })));
1817 }
1818
1819 #[test]
1820 fn http_method_as_str_round_trips() {
1821 let methods = [
1822 HttpMethod::Get,
1823 HttpMethod::Post,
1824 HttpMethod::Put,
1825 HttpMethod::Delete,
1826 HttpMethod::Options,
1827 HttpMethod::Head,
1828 HttpMethod::Patch,
1829 ];
1830 for m in methods {
1831 let s = m.as_str();
1832 let parsed = HttpMethod::parse(s).unwrap();
1833 assert_eq!(parsed, m);
1834 }
1835 }
1836
1837 #[test]
1838 fn http_status_boundary_cases() {
1839 assert!(HttpStatus(299).is_success());
1840 assert!(!HttpStatus(300).is_success());
1841 assert!(HttpStatus(499).is_client_error());
1842 assert!(!HttpStatus(500).is_client_error());
1843 assert!(HttpStatus(599).is_server_error());
1844 assert!(!HttpStatus(600).is_server_error());
1845 }
1846
1847 #[test]
1848 fn http_request_content_type_and_authorization() {
1849 let req = HttpRequest::new(HttpMethod::Get, "/")
1850 .with_header("Content-Type", "text/plain")
1851 .with_header("Authorization", "Bearer token123");
1852 assert_eq!(req.content_type(), Some("text/plain"));
1853 assert_eq!(req.authorization(), Some("Bearer token123"));
1854 }
1855
1856 #[test]
1857 fn http_request_json_parse() {
1858 let body = serde_json::json!({"key": "value"});
1859 let req =
1860 HttpRequest::new(HttpMethod::Post, "/").with_body(serde_json::to_vec(&body).unwrap());
1861 let parsed: serde_json::Value = req.json().unwrap();
1862 assert_eq!(parsed["key"], "value");
1863 }
1864
1865 #[test]
1866 fn http_response_convenience_constructors() {
1867 let bad = HttpResponse::bad_request();
1868 assert_eq!(bad.status, HttpStatus::BAD_REQUEST);
1869 let err = HttpResponse::internal_error();
1870 assert_eq!(err.status, HttpStatus::INTERNAL_SERVER_ERROR);
1871 }
1872
1873 #[test]
1874 fn http_handler_config_defaults() {
1875 let config = HttpHandlerConfig::default();
1876 assert_eq!(config.base_path, "/mcp/v1");
1877 assert!(config.allow_cors);
1878 assert_eq!(config.cors_origins, vec!["*".to_string()]);
1879 assert_eq!(config.timeout, Duration::from_secs(30));
1880 assert_eq!(config.max_body_size, 10 * 1024 * 1024);
1881 }
1882
1883 #[test]
1884 fn http_handler_config_accessor() {
1885 let handler = HttpRequestHandler::new();
1886 assert_eq!(handler.config().base_path, "/mcp/v1");
1887 }
1888
1889 #[test]
1890 fn http_error_display_all_variants() {
1891 let cases: Vec<(HttpError, &str)> = vec![
1892 (
1893 HttpError::InvalidMethod("X".into()),
1894 "invalid HTTP method: X",
1895 ),
1896 (
1897 HttpError::InvalidContentType("text/xml".into()),
1898 "invalid content type: text/xml",
1899 ),
1900 (
1901 HttpError::HeadersTooLarge { size: 100, max: 50 },
1902 "headers too large: 100 > 50",
1903 ),
1904 (
1905 HttpError::BodyTooLarge {
1906 size: 200,
1907 max: 100,
1908 },
1909 "body too large: 200 > 100",
1910 ),
1911 (
1912 HttpError::UnsupportedTransferEncoding("gzip".into()),
1913 "unsupported transfer encoding: gzip",
1914 ),
1915 (HttpError::Timeout, "request timeout"),
1916 (HttpError::Closed, "connection closed"),
1917 ];
1918 for (err, expected) in cases {
1919 assert!(
1920 err.to_string().contains(expected),
1921 "expected '{}' in '{}'",
1922 expected,
1923 err
1924 );
1925 }
1926 }
1927
1928 #[test]
1929 fn http_error_from_codec_error() {
1930 let codec_err = CodecError::MessageTooLarge(999);
1931 let http_err: HttpError = codec_err.into();
1932 assert!(matches!(http_err, HttpError::CodecError(_)));
1933 assert!(http_err.to_string().contains("codec error"));
1934 }
1935
1936 #[test]
1937 fn http_error_from_transport_error() {
1938 let transport_err = TransportError::Closed;
1939 let http_err: HttpError = transport_err.into();
1940 assert!(matches!(http_err, HttpError::Transport(_)));
1941 assert!(http_err.to_string().contains("transport error"));
1942 }
1943
1944 #[test]
1945 fn http_transport_send_rejects_request_messages() {
1946 use std::io::Cursor;
1947
1948 let reader = Cursor::new(Vec::<u8>::new());
1949 let mut output = Vec::new();
1950 let cx = Cx::for_testing();
1951 let mut transport = HttpTransport::new(reader, &mut output);
1952
1953 let request = JsonRpcRequest::new("test", None, 1i64);
1954 let result = transport.send(&cx, &JsonRpcMessage::Request(request));
1955 assert!(result.is_err());
1956 }
1957
1958 #[test]
1959 fn session_store_cleanup_removes_expired() {
1960 let store = SessionStore::new(Duration::from_millis(50));
1961 let _id1 = store.create();
1962 let _id2 = store.create();
1963 assert_eq!(store.count(), 2);
1964
1965 std::thread::sleep(Duration::from_millis(100));
1966 store.cleanup();
1967 assert_eq!(store.count(), 0);
1968 }
1969
1970 #[test]
1971 fn handle_options_cors_disabled() {
1972 let config = HttpHandlerConfig {
1973 allow_cors: false,
1974 ..Default::default()
1975 };
1976 let handler = HttpRequestHandler::with_config(config);
1977 let request = HttpRequest::new(HttpMethod::Options, "/mcp/v1");
1978 let response = handler.handle_options(&request);
1979 assert_eq!(response.status, HttpStatus::METHOD_NOT_ALLOWED);
1980 }
1981}