Skip to main content

fastmcp_transport/
lib.rs

1//! Transport layer for FastMCP.
2//!
3//! This crate provides transport implementations for MCP communication:
4//! - **Stdio**: Standard input/output (primary transport)
5//! - **SSE**: Server-Sent Events (HTTP-based streaming)
6//! - **WebSocket**: Bidirectional web sockets
7//!
8//! # Transport Design
9//!
10//! Transports are designed around asupersync's principles:
11//!
12//! - **Cancel-correctness**: All operations check cancellation via `Cx::checkpoint()`
13//! - **Two-phase sends**: Use reserve/commit pattern to prevent message loss
14//! - **Budget awareness**: Operations respect the request's budget constraints
15//!
16//! # Wire Format
17//!
18//! MCP uses newline-delimited JSON (NDJSON) for message framing:
19//! - Each message is a single line of JSON
20//! - Messages are separated by `\n`
21//! - UTF-8 encoding is required
22//!
23//! # Role in the System
24//!
25//! `fastmcp-transport` is the **I/O boundary** for FastMCP. It is deliberately
26//! protocol-agnostic: transports move `JsonRpcMessage` values in and out while
27//! the server/client layers handle semantics. This keeps transport
28//! implementations small, testable, and reusable.
29//!
30//! If you need to add a new transport (for example, QUIC or a custom IPC),
31//! this is the crate to extend.
32
33#![forbid(unsafe_code)]
34#![allow(dead_code)]
35
36mod async_io;
37mod codec;
38pub mod event_store;
39pub mod http;
40pub mod memory;
41pub mod sse;
42mod stdio;
43pub mod websocket;
44
45pub use async_io::{AsyncLineReader, AsyncStdin, AsyncStdout};
46
47pub use codec::{Codec, CodecError};
48pub use stdio::{AsyncStdioTransport, StdioTransport};
49
50use asupersync::Cx;
51use fastmcp_protocol::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse};
52
53/// Transport trait for cancel-correct message passing.
54///
55/// All transports must integrate with asupersync's capability context (`Cx`)
56/// for cancellation checking and budget enforcement.
57///
58/// # Cancel-Safety
59///
60/// Implementations should:
61/// - Call `cx.checkpoint()` before blocking operations
62/// - Use two-phase patterns (reserve/commit) where applicable
63/// - Respect budget constraints from the context
64///
65/// # Example
66///
67/// ```ignore
68/// impl Transport for MyTransport {
69///     fn send(&mut self, cx: &Cx, msg: &JsonRpcMessage) -> Result<(), TransportError> {
70///         cx.checkpoint()?;  // Check for cancellation
71///         let bytes = self.codec.encode(msg)?;
72///         self.write_all(&bytes)?;
73///         Ok(())
74///     }
75/// }
76/// ```
77pub trait Transport {
78    /// Send a JSON-RPC message through this transport.
79    ///
80    /// # Cancel-Safety
81    ///
82    /// This operation checks for cancellation before sending.
83    /// If cancelled, the message is not sent.
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if the transport is closed, an I/O error occurs,
88    /// or the request has been cancelled.
89    fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError>;
90
91    /// Receive the next JSON-RPC message from this transport.
92    ///
93    /// # Cancel-Safety
94    ///
95    /// This operation checks for cancellation while waiting for data.
96    /// If cancelled, returns `TransportError::Cancelled`.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the transport is closed, an I/O error occurs,
101    /// or the request has been cancelled.
102    fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError>;
103
104    /// Send a request through this transport.
105    ///
106    /// Convenience method that wraps a request in a message.
107    fn send_request(&mut self, cx: &Cx, request: &JsonRpcRequest) -> Result<(), TransportError> {
108        self.send(cx, &JsonRpcMessage::Request(request.clone()))
109    }
110
111    /// Send a response through this transport.
112    ///
113    /// Convenience method that wraps a response in a message.
114    fn send_response(&mut self, cx: &Cx, response: &JsonRpcResponse) -> Result<(), TransportError> {
115        self.send(cx, &JsonRpcMessage::Response(response.clone()))
116    }
117
118    /// Close the transport gracefully.
119    ///
120    /// This flushes any pending data and releases resources.
121    fn close(&mut self) -> Result<(), TransportError>;
122}
123
124/// Transport error types.
125#[derive(Debug)]
126pub enum TransportError {
127    /// I/O error during read or write.
128    Io(std::io::Error),
129    /// Transport was closed (EOF or explicit close).
130    Closed,
131    /// Codec error (JSON parsing or encoding).
132    Codec(CodecError),
133    /// Connection timeout.
134    Timeout,
135    /// Request was cancelled.
136    Cancelled,
137}
138
139impl TransportError {
140    /// Returns true if this is a cancellation error.
141    #[must_use]
142    pub fn is_cancelled(&self) -> bool {
143        matches!(self, TransportError::Cancelled)
144    }
145
146    /// Returns true if this is an EOF/closed condition.
147    #[must_use]
148    pub fn is_closed(&self) -> bool {
149        matches!(self, TransportError::Closed)
150    }
151}
152
153impl std::fmt::Display for TransportError {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        match self {
156            TransportError::Io(e) => write!(f, "I/O error: {e}"),
157            TransportError::Closed => write!(f, "Transport closed"),
158            TransportError::Codec(e) => write!(f, "Codec error: {e}"),
159            TransportError::Timeout => write!(f, "Connection timeout"),
160            TransportError::Cancelled => write!(f, "Request cancelled"),
161        }
162    }
163}
164
165impl std::error::Error for TransportError {
166    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
167        match self {
168            TransportError::Io(e) => Some(e),
169            TransportError::Codec(e) => Some(e),
170            _ => None,
171        }
172    }
173}
174
175impl From<std::io::Error> for TransportError {
176    fn from(err: std::io::Error) -> Self {
177        TransportError::Io(err)
178    }
179}
180
181impl From<CodecError> for TransportError {
182    fn from(err: CodecError) -> Self {
183        TransportError::Codec(err)
184    }
185}
186
187// =============================================================================
188// Two-Phase Send Protocol
189// =============================================================================
190
191/// A permit for sending a message via two-phase commit.
192///
193/// This implements the reserve/commit pattern for cancel-safe message sending:
194/// 1. **Reserve**: Allocate the permit (cancellable)
195/// 2. **Commit**: Send the message (infallible after reserve)
196///
197/// # Cancel-Safety
198///
199/// The reservation phase is the cancellation point. Once you have a permit,
200/// the send will complete. This ensures no message loss on cancellation:
201///
202/// ```ignore
203/// // Cancel-safe pattern:
204/// let permit = transport.reserve_send(cx)?;  // Can be cancelled here
205/// permit.send(message);                       // Always succeeds
206/// ```
207///
208/// # Example
209///
210/// ```ignore
211/// use fastmcp_transport::{AsyncStdioTransport, TwoPhaseTransport};
212/// use asupersync::Cx;
213///
214/// let mut transport = AsyncStdioTransport::new();
215/// let cx = Cx::for_testing();
216///
217/// // Reserve a send slot (cancellable)
218/// let permit = transport.reserve_send(&cx)?;
219///
220/// // At this point, we're committed - send is infallible
221/// permit.send(&JsonRpcMessage::Request(request));
222/// ```
223pub struct SendPermit<'a, W: std::io::Write> {
224    writer: &'a mut W,
225    codec: &'a Codec,
226}
227
228impl<'a, W: std::io::Write> SendPermit<'a, W> {
229    /// Creates a new send permit.
230    ///
231    /// This is an internal constructor. Use `TwoPhaseTransport::reserve_send()`
232    /// to obtain a permit.
233    fn new(writer: &'a mut W, codec: &'a Codec) -> Self {
234        Self { writer, codec }
235    }
236
237    /// Commits the send by writing the message.
238    ///
239    /// This method is synchronous and, from the protocol's perspective,
240    /// infallible after reservation. I/O errors are returned but the
241    /// reservation is consumed regardless.
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if the underlying write fails. However, the permit
246    /// is consumed and the reservation is released.
247    pub fn send(self, message: &JsonRpcMessage) -> Result<(), TransportError> {
248        let bytes = match message {
249            JsonRpcMessage::Request(req) => self.codec.encode_request(req)?,
250            JsonRpcMessage::Response(resp) => self.codec.encode_response(resp)?,
251        };
252
253        self.writer.write_all(&bytes)?;
254        self.writer.flush()?;
255        Ok(())
256    }
257
258    /// Commits the send by writing a request.
259    ///
260    /// Convenience method for sending a request directly.
261    ///
262    /// # Errors
263    ///
264    /// Returns an error if the underlying write fails.
265    pub fn send_request(self, request: &JsonRpcRequest) -> Result<(), TransportError> {
266        let bytes = self.codec.encode_request(request)?;
267        self.writer.write_all(&bytes)?;
268        self.writer.flush()?;
269        Ok(())
270    }
271
272    /// Commits the send by writing a response.
273    ///
274    /// Convenience method for sending a response directly.
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the underlying write fails.
279    pub fn send_response(self, response: &JsonRpcResponse) -> Result<(), TransportError> {
280        let bytes = self.codec.encode_response(response)?;
281        self.writer.write_all(&bytes)?;
282        self.writer.flush()?;
283        Ok(())
284    }
285}
286
287/// Extension trait for two-phase send operations.
288///
289/// This trait adds the reserve/commit pattern to transports. The pattern
290/// ensures cancel-safety by making the reservation the cancellation point:
291///
292/// - **Reserve phase**: Check cancellation, allocate resources
293/// - **Commit phase**: Actually send (synchronous, infallible from protocol perspective)
294///
295/// # Why Two-Phase?
296///
297/// Without two-phase:
298/// ```ignore
299/// // BROKEN: Can lose messages on cancel
300/// async fn bad_send(cx: &Cx, msg: Message) {
301///     let serialized = serialize(&msg);  // Work done
302///     cx.checkpoint()?;                   // Cancel here = lost message!
303///     writer.write(&serialized).await;
304/// }
305/// ```
306///
307/// With two-phase:
308/// ```ignore
309/// // CORRECT: Either fully sent or not started
310/// async fn good_send(cx: &Cx, msg: Message) {
311///     let permit = transport.reserve_send(cx)?;  // Cancel here = no work lost
312///     // After reserve, commit is synchronous and infallible
313///     permit.send(msg);
314/// }
315/// ```
316pub trait TwoPhaseTransport: Transport {
317    /// The writer type for permits.
318    type Writer: std::io::Write;
319
320    /// Reserve a send slot.
321    ///
322    /// This is the cancellation point for sends. If this succeeds, the
323    /// subsequent `permit.send()` will complete.
324    ///
325    /// # Errors
326    ///
327    /// Returns `TransportError::Cancelled` if the request has been cancelled.
328    fn reserve_send(&mut self, cx: &Cx) -> Result<SendPermit<'_, Self::Writer>, TransportError>;
329}