1#![forbid(unsafe_code)]
34#![allow(dead_code)]
35
36mod async_io;
37mod codec;
38pub mod event_store;
39pub mod http;
40pub mod memory;
41pub mod sse;
42mod stdio;
43pub mod websocket;
44
45pub use async_io::{AsyncLineReader, AsyncStdin, AsyncStdout};
46
47pub use codec::{Codec, CodecError};
48pub use stdio::{AsyncStdioTransport, StdioTransport};
49
50use asupersync::Cx;
51use fastmcp_protocol::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse};
52
53pub trait Transport {
78 fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError>;
90
91 fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError>;
103
104 fn send_request(&mut self, cx: &Cx, request: &JsonRpcRequest) -> Result<(), TransportError> {
108 self.send(cx, &JsonRpcMessage::Request(request.clone()))
109 }
110
111 fn send_response(&mut self, cx: &Cx, response: &JsonRpcResponse) -> Result<(), TransportError> {
115 self.send(cx, &JsonRpcMessage::Response(response.clone()))
116 }
117
118 fn close(&mut self) -> Result<(), TransportError>;
122}
123
124#[derive(Debug)]
126pub enum TransportError {
127 Io(std::io::Error),
129 Closed,
131 Codec(CodecError),
133 Timeout,
135 Cancelled,
137}
138
139impl TransportError {
140 #[must_use]
142 pub fn is_cancelled(&self) -> bool {
143 matches!(self, TransportError::Cancelled)
144 }
145
146 #[must_use]
148 pub fn is_closed(&self) -> bool {
149 matches!(self, TransportError::Closed)
150 }
151}
152
153impl std::fmt::Display for TransportError {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 match self {
156 TransportError::Io(e) => write!(f, "I/O error: {e}"),
157 TransportError::Closed => write!(f, "Transport closed"),
158 TransportError::Codec(e) => write!(f, "Codec error: {e}"),
159 TransportError::Timeout => write!(f, "Connection timeout"),
160 TransportError::Cancelled => write!(f, "Request cancelled"),
161 }
162 }
163}
164
165impl std::error::Error for TransportError {
166 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
167 match self {
168 TransportError::Io(e) => Some(e),
169 TransportError::Codec(e) => Some(e),
170 _ => None,
171 }
172 }
173}
174
175impl From<std::io::Error> for TransportError {
176 fn from(err: std::io::Error) -> Self {
177 TransportError::Io(err)
178 }
179}
180
181impl From<CodecError> for TransportError {
182 fn from(err: CodecError) -> Self {
183 TransportError::Codec(err)
184 }
185}
186
187pub struct SendPermit<'a, W: std::io::Write> {
224 writer: &'a mut W,
225 codec: &'a Codec,
226}
227
228impl<'a, W: std::io::Write> SendPermit<'a, W> {
229 fn new(writer: &'a mut W, codec: &'a Codec) -> Self {
234 Self { writer, codec }
235 }
236
237 pub fn send(self, message: &JsonRpcMessage) -> Result<(), TransportError> {
248 let bytes = match message {
249 JsonRpcMessage::Request(req) => self.codec.encode_request(req)?,
250 JsonRpcMessage::Response(resp) => self.codec.encode_response(resp)?,
251 };
252
253 self.writer.write_all(&bytes)?;
254 self.writer.flush()?;
255 Ok(())
256 }
257
258 pub fn send_request(self, request: &JsonRpcRequest) -> Result<(), TransportError> {
266 let bytes = self.codec.encode_request(request)?;
267 self.writer.write_all(&bytes)?;
268 self.writer.flush()?;
269 Ok(())
270 }
271
272 pub fn send_response(self, response: &JsonRpcResponse) -> Result<(), TransportError> {
280 let bytes = self.codec.encode_response(response)?;
281 self.writer.write_all(&bytes)?;
282 self.writer.flush()?;
283 Ok(())
284 }
285}
286
287pub trait TwoPhaseTransport: Transport {
317 type Writer: std::io::Write;
319
320 fn reserve_send(&mut self, cx: &Cx) -> Result<SendPermit<'_, Self::Writer>, TransportError>;
329}
330
331#[cfg(test)]
332mod tests {
333 use super::{Codec, CodecError, SendPermit, Transport, TransportError, TwoPhaseTransport};
334 use asupersync::Cx;
335 use fastmcp_protocol::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse, RequestId};
336 use std::error::Error;
337
338 #[derive(Default)]
339 struct RecordingTransport {
340 sent: Vec<JsonRpcMessage>,
341 closed: bool,
342 }
343
344 impl Transport for RecordingTransport {
345 fn send(&mut self, _cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
346 self.sent.push(message.clone());
347 Ok(())
348 }
349
350 fn recv(&mut self, _cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
351 Err(TransportError::Closed)
352 }
353
354 fn close(&mut self) -> Result<(), TransportError> {
355 self.closed = true;
356 Ok(())
357 }
358 }
359
360 struct TwoPhaseFixture {
361 writer: Vec<u8>,
362 codec: Codec,
363 }
364
365 impl Default for TwoPhaseFixture {
366 fn default() -> Self {
367 Self {
368 writer: Vec::new(),
369 codec: Codec::new(),
370 }
371 }
372 }
373
374 impl Transport for TwoPhaseFixture {
375 fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
376 let permit = self.reserve_send(cx)?;
377 permit.send(message)
378 }
379
380 fn recv(&mut self, _cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
381 Err(TransportError::Closed)
382 }
383
384 fn close(&mut self) -> Result<(), TransportError> {
385 Ok(())
386 }
387 }
388
389 impl TwoPhaseTransport for TwoPhaseFixture {
390 type Writer = Vec<u8>;
391
392 fn reserve_send(
393 &mut self,
394 cx: &Cx,
395 ) -> Result<SendPermit<'_, Self::Writer>, TransportError> {
396 if cx.is_cancel_requested() {
397 return Err(TransportError::Cancelled);
398 }
399
400 Ok(SendPermit::new(&mut self.writer, &self.codec))
401 }
402 }
403
404 fn test_error(message: &str) -> Box<dyn Error> {
405 std::io::Error::other(message).into()
406 }
407
408 fn require(condition: bool, message: &str) -> Result<(), Box<dyn Error>> {
409 if condition {
410 Ok(())
411 } else {
412 Err(test_error(message))
413 }
414 }
415
416 #[test]
417 fn transport_error_predicates_match_variants() -> Result<(), Box<dyn Error>> {
418 require(
419 TransportError::Cancelled.is_cancelled(),
420 "cancelled flag mismatch",
421 )?;
422 require(
423 !TransportError::Timeout.is_cancelled(),
424 "timeout should not be cancelled",
425 )?;
426 require(TransportError::Closed.is_closed(), "closed flag mismatch")?;
427 require(
428 !TransportError::Timeout.is_closed(),
429 "timeout should not be closed",
430 )?;
431 Ok(())
432 }
433
434 #[test]
435 fn transport_error_display_and_source_are_exposed() -> Result<(), Box<dyn Error>> {
436 let io_error = std::io::Error::other("write failed");
437 let io_transport_error = TransportError::Io(io_error);
438 require(
439 io_transport_error.to_string() == "I/O error: write failed",
440 "io display mismatch",
441 )?;
442 require(
443 io_transport_error.source().is_some(),
444 "io source should exist",
445 )?;
446
447 let json_error = match serde_json::from_str::<serde_json::Value>("not json") {
448 Err(err) => err,
449 Ok(_) => return Err(test_error("invalid json unexpectedly parsed")),
450 };
451 let codec_error = CodecError::from(json_error);
452 let codec_transport_error = TransportError::Codec(codec_error);
453 require(
454 codec_transport_error
455 .to_string()
456 .starts_with("Codec error: JSON error:"),
457 "codec display mismatch",
458 )?;
459 require(
460 codec_transport_error.source().is_some(),
461 "codec source should exist",
462 )?;
463
464 require(
465 TransportError::Timeout.source().is_none(),
466 "timeout should not have source",
467 )?;
468 require(
469 TransportError::Closed.source().is_none(),
470 "closed should not have source",
471 )?;
472 require(
473 TransportError::Cancelled.source().is_none(),
474 "cancelled should not have source",
475 )?;
476 Ok(())
477 }
478
479 #[test]
480 fn transport_error_from_conversions_wrap_underlying_types() -> Result<(), Box<dyn Error>> {
481 let io_transport_error = TransportError::from(std::io::Error::other("socket closed"));
482 require(
483 matches!(io_transport_error, TransportError::Io(_)),
484 "io conversion mismatch",
485 )?;
486
487 let json_error = match serde_json::from_str::<serde_json::Value>("bad json") {
488 Err(err) => err,
489 Ok(_) => return Err(test_error("invalid json unexpectedly parsed")),
490 };
491 let codec_transport_error = TransportError::from(CodecError::from(json_error));
492 require(
493 matches!(codec_transport_error, TransportError::Codec(_)),
494 "codec conversion mismatch",
495 )?;
496 Ok(())
497 }
498
499 #[test]
500 fn send_request_wraps_request_message() -> Result<(), Box<dyn Error>> {
501 let mut transport = RecordingTransport::default();
502 let cx = Cx::for_testing();
503 let request = JsonRpcRequest::new("tools/list", None, 7i64);
504
505 transport.send_request(&cx, &request)?;
506
507 require(transport.sent.len() == 1, "expected one sent message")?;
508 match &transport.sent[0] {
509 JsonRpcMessage::Request(req) => {
510 require(req.method == "tools/list", "request method mismatch")?;
511 require(
512 req.id == Some(RequestId::Number(7)),
513 "request id mismatch for wrapped message",
514 )?;
515 }
516 JsonRpcMessage::Response(_) => {
517 return Err(test_error("expected request message"));
518 }
519 }
520 Ok(())
521 }
522
523 #[test]
524 fn send_response_wraps_response_message() -> Result<(), Box<dyn Error>> {
525 let mut transport = RecordingTransport::default();
526 let cx = Cx::for_testing();
527 let response = JsonRpcResponse::success(
528 RequestId::Number(9),
529 serde_json::json!({"server": "fastmcp"}),
530 );
531
532 transport.send_response(&cx, &response)?;
533
534 require(transport.sent.len() == 1, "expected one sent message")?;
535 match &transport.sent[0] {
536 JsonRpcMessage::Response(resp) => {
537 require(
538 resp.id == Some(RequestId::Number(9)),
539 "response id mismatch for wrapped message",
540 )?;
541 }
542 JsonRpcMessage::Request(_) => {
543 return Err(test_error("expected response message"));
544 }
545 }
546 Ok(())
547 }
548
549 #[test]
550 fn send_permit_writes_request_bytes() -> Result<(), Box<dyn Error>> {
551 let cx = Cx::for_testing();
552 let mut fixture = TwoPhaseFixture::default();
553 let request = JsonRpcRequest::new("resources/list", None, 11i64);
554
555 let permit = fixture.reserve_send(&cx)?;
556 permit.send_request(&request)?;
557
558 let mut decode_codec = Codec::new();
559 let messages = decode_codec.decode(&fixture.writer)?;
560 require(messages.len() == 1, "expected one decoded message")?;
561 match &messages[0] {
562 JsonRpcMessage::Request(req) => {
563 require(
564 req.method == "resources/list",
565 "decoded request method mismatch",
566 )?;
567 require(
568 req.id == Some(RequestId::Number(11)),
569 "decoded request id mismatch",
570 )?;
571 }
572 JsonRpcMessage::Response(_) => {
573 return Err(test_error("expected request message"));
574 }
575 }
576 Ok(())
577 }
578
579 #[test]
580 fn send_permit_writes_response_bytes() -> Result<(), Box<dyn Error>> {
581 let cx = Cx::for_testing();
582 let mut fixture = TwoPhaseFixture::default();
583 let response =
584 JsonRpcResponse::success(RequestId::Number(22), serde_json::json!({"status": "ok"}));
585
586 let permit = fixture.reserve_send(&cx)?;
587 permit.send_response(&response)?;
588
589 let mut decode_codec = Codec::new();
590 let messages = decode_codec.decode(&fixture.writer)?;
591 require(messages.len() == 1, "expected one decoded message")?;
592 match &messages[0] {
593 JsonRpcMessage::Response(resp) => {
594 require(
595 resp.id == Some(RequestId::Number(22)),
596 "decoded response id mismatch",
597 )?;
598 }
599 JsonRpcMessage::Request(_) => {
600 return Err(test_error("expected response message"));
601 }
602 }
603 Ok(())
604 }
605
606 #[test]
607 fn reserve_send_returns_cancelled_when_context_is_cancelled() -> Result<(), Box<dyn Error>> {
608 let cx = Cx::for_testing();
609 cx.set_cancel_requested(true);
610 let mut fixture = TwoPhaseFixture::default();
611
612 let result = fixture.reserve_send(&cx);
613
614 match result {
615 Err(TransportError::Cancelled) => Ok(()),
616 _ => Err(test_error("reserve_send should return cancelled")),
617 }
618 }
619
620 #[test]
621 fn recording_transport_close() {
622 let mut transport = RecordingTransport::default();
623 assert!(!transport.closed);
624 transport.close().unwrap();
625 assert!(transport.closed);
626 }
627
628 #[test]
629 fn recording_transport_recv_returns_closed() {
630 let mut transport = RecordingTransport::default();
631 let cx = Cx::for_testing();
632 let result = transport.recv(&cx);
633 assert!(matches!(result, Err(TransportError::Closed)));
634 }
635
636 #[test]
637 fn transport_error_display_all_variants() {
638 assert_eq!(TransportError::Closed.to_string(), "Transport closed");
639 assert_eq!(TransportError::Timeout.to_string(), "Connection timeout");
640 assert_eq!(TransportError::Cancelled.to_string(), "Request cancelled");
641 }
642
643 #[test]
644 fn transport_error_is_cancelled_false_for_other_variants() {
645 assert!(!TransportError::Closed.is_cancelled());
646 assert!(!TransportError::Io(std::io::Error::other("err")).is_cancelled());
647 assert!(!TransportError::Codec(CodecError::MessageTooLarge(1)).is_cancelled());
648 }
649
650 #[test]
651 fn transport_error_is_closed_false_for_other_variants() {
652 assert!(!TransportError::Cancelled.is_closed());
653 assert!(!TransportError::Timeout.is_closed());
654 assert!(!TransportError::Io(std::io::Error::other("err")).is_closed());
655 assert!(!TransportError::Codec(CodecError::MessageTooLarge(1)).is_closed());
656 }
657
658 #[test]
659 fn send_permit_sends_request_as_message() -> Result<(), Box<dyn Error>> {
660 let cx = Cx::for_testing();
661 let mut fixture = TwoPhaseFixture::default();
662 let request = JsonRpcRequest::new("tools/call", None, 1i64);
663
664 let permit = fixture.reserve_send(&cx)?;
665 permit.send(&JsonRpcMessage::Request(request))?;
666
667 let mut decode_codec = Codec::new();
668 let messages = decode_codec.decode(&fixture.writer)?;
669 require(messages.len() == 1, "expected one decoded message")?;
670 match &messages[0] {
671 JsonRpcMessage::Request(req) => {
672 require(req.method == "tools/call", "method mismatch")?;
673 }
674 _ => return Err(test_error("expected request")),
675 }
676 Ok(())
677 }
678
679 #[test]
680 fn send_permit_sends_response_as_message() -> Result<(), Box<dyn Error>> {
681 let cx = Cx::for_testing();
682 let mut fixture = TwoPhaseFixture::default();
683 let response =
684 JsonRpcResponse::success(RequestId::Number(5), serde_json::json!({"ok": true}));
685
686 let permit = fixture.reserve_send(&cx)?;
687 permit.send(&JsonRpcMessage::Response(response))?;
688
689 let mut decode_codec = Codec::new();
690 let messages = decode_codec.decode(&fixture.writer)?;
691 require(messages.len() == 1, "expected one decoded message")?;
692 assert!(matches!(&messages[0], JsonRpcMessage::Response(_)));
693 Ok(())
694 }
695
696 #[test]
697 fn send_multiple_messages_via_transport() {
698 let mut transport = RecordingTransport::default();
699 let cx = Cx::for_testing();
700
701 for i in 0..5 {
702 let request = JsonRpcRequest::new(format!("method/{i}"), None, i as i64);
703 transport.send_request(&cx, &request).unwrap();
704 }
705
706 assert_eq!(transport.sent.len(), 5);
707 for (i, msg) in transport.sent.iter().enumerate() {
708 if let JsonRpcMessage::Request(req) = msg {
709 assert_eq!(req.method, format!("method/{i}"));
710 } else {
711 panic!("expected request at index {i}");
712 }
713 }
714 }
715
716 #[test]
717 fn two_phase_fixture_send_via_transport_trait() -> Result<(), Box<dyn Error>> {
718 let cx = Cx::for_testing();
719 let mut fixture = TwoPhaseFixture::default();
720 let request = JsonRpcRequest::new("test/method", None, 42i64);
721
722 fixture.send(&cx, &JsonRpcMessage::Request(request))?;
724
725 let mut decode_codec = Codec::new();
726 let messages = decode_codec.decode(&fixture.writer)?;
727 require(messages.len() == 1, "expected one decoded message")?;
728 Ok(())
729 }
730
731 #[test]
732 fn two_phase_fixture_close_succeeds() {
733 let mut fixture = TwoPhaseFixture::default();
734 assert!(fixture.close().is_ok());
735 }
736
737 #[test]
738 fn two_phase_multiple_sends() -> Result<(), Box<dyn Error>> {
739 let cx = Cx::for_testing();
740 let mut fixture = TwoPhaseFixture::default();
741
742 for i in 0..3 {
744 let permit = fixture.reserve_send(&cx)?;
745 let request = JsonRpcRequest::new(format!("method/{i}"), None, i as i64);
746 permit.send_request(&request)?;
747 }
748
749 let mut decode_codec = Codec::new();
750 let messages = decode_codec.decode(&fixture.writer)?;
751 require(messages.len() == 3, "expected three decoded messages")?;
752 Ok(())
753 }
754
755 #[test]
756 fn send_permit_notification_without_id() -> Result<(), Box<dyn Error>> {
757 let cx = Cx::for_testing();
758 let mut fixture = TwoPhaseFixture::default();
759 let notification = JsonRpcRequest::notification("notifications/progress", None);
760
761 let permit = fixture.reserve_send(&cx)?;
762 permit.send_request(¬ification)?;
763
764 let mut decode_codec = Codec::new();
765 let messages = decode_codec.decode(&fixture.writer)?;
766 require(messages.len() == 1, "expected one decoded message")?;
767 if let JsonRpcMessage::Request(req) = &messages[0] {
768 require(req.id.is_none(), "notification should have no id")?;
769 }
770 Ok(())
771 }
772}