fastmcp_transport/lib.rs
1//! Transport layer for FastMCP.
2//!
3//! This crate provides transport implementations for MCP communication:
4//! - **Stdio**: Standard input/output (primary transport)
5//! - **SSE**: Server-Sent Events (HTTP-based streaming)
6//! - **WebSocket**: Bidirectional web sockets
7//!
8//! # Transport Design
9//!
10//! Transports are designed around asupersync's principles:
11//!
12//! - **Cancel-correctness**: All operations check cancellation via `Cx::checkpoint()`
13//! - **Two-phase sends**: Use reserve/commit pattern to prevent message loss
14//! - **Budget awareness**: Operations respect the request's budget constraints
15//!
16//! # Wire Format
17//!
18//! MCP uses newline-delimited JSON (NDJSON) for message framing:
19//! - Each message is a single line of JSON
20//! - Messages are separated by `\n`
21//! - UTF-8 encoding is required
22//!
23//! # Role in the System
24//!
25//! `fastmcp-transport` is the **I/O boundary** for FastMCP. It is deliberately
26//! protocol-agnostic: transports move `JsonRpcMessage` values in and out while
27//! the server/client layers handle semantics. This keeps transport
28//! implementations small, testable, and reusable.
29//!
30//! If you need to add a new transport (for example, QUIC or a custom IPC),
31//! this is the crate to extend.
32
33#![forbid(unsafe_code)]
34#![allow(dead_code)]
35
36mod async_io;
37mod codec;
38pub mod event_store;
39pub mod http;
40pub mod memory;
41pub mod sse;
42mod stdio;
43pub mod websocket;
44
45pub use async_io::{AsyncLineReader, AsyncStdin, AsyncStdout};
46
47pub use codec::{Codec, CodecError};
48pub use stdio::{AsyncStdioTransport, StdioTransport};
49
50use asupersync::Cx;
51use fastmcp_protocol::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse};
52
53/// Transport trait for cancel-correct message passing.
54///
55/// All transports must integrate with asupersync's capability context (`Cx`)
56/// for cancellation checking and budget enforcement.
57///
58/// # Cancel-Safety
59///
60/// Implementations should:
61/// - Call `cx.checkpoint()` before blocking operations
62/// - Use two-phase patterns (reserve/commit) where applicable
63/// - Respect budget constraints from the context
64///
65/// # Example
66///
67/// ```ignore
68/// impl Transport for MyTransport {
69/// fn send(&mut self, cx: &Cx, msg: &JsonRpcMessage) -> Result<(), TransportError> {
70/// cx.checkpoint()?; // Check for cancellation
71/// let bytes = self.codec.encode(msg)?;
72/// self.write_all(&bytes)?;
73/// Ok(())
74/// }
75/// }
76/// ```
77pub trait Transport {
78 /// Send a JSON-RPC message through this transport.
79 ///
80 /// # Cancel-Safety
81 ///
82 /// This operation checks for cancellation before sending.
83 /// If cancelled, the message is not sent.
84 ///
85 /// # Errors
86 ///
87 /// Returns an error if the transport is closed, an I/O error occurs,
88 /// or the request has been cancelled.
89 fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError>;
90
91 /// Receive the next JSON-RPC message from this transport.
92 ///
93 /// # Cancel-Safety
94 ///
95 /// This operation checks for cancellation while waiting for data.
96 /// If cancelled, returns `TransportError::Cancelled`.
97 ///
98 /// # Errors
99 ///
100 /// Returns an error if the transport is closed, an I/O error occurs,
101 /// or the request has been cancelled.
102 fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError>;
103
104 /// Send a request through this transport.
105 ///
106 /// Convenience method that wraps a request in a message.
107 fn send_request(&mut self, cx: &Cx, request: &JsonRpcRequest) -> Result<(), TransportError> {
108 self.send(cx, &JsonRpcMessage::Request(request.clone()))
109 }
110
111 /// Send a response through this transport.
112 ///
113 /// Convenience method that wraps a response in a message.
114 fn send_response(&mut self, cx: &Cx, response: &JsonRpcResponse) -> Result<(), TransportError> {
115 self.send(cx, &JsonRpcMessage::Response(response.clone()))
116 }
117
118 /// Close the transport gracefully.
119 ///
120 /// This flushes any pending data and releases resources.
121 fn close(&mut self) -> Result<(), TransportError>;
122}
123
124/// Transport error types.
125#[derive(Debug)]
126pub enum TransportError {
127 /// I/O error during read or write.
128 Io(std::io::Error),
129 /// Transport was closed (EOF or explicit close).
130 Closed,
131 /// Codec error (JSON parsing or encoding).
132 Codec(CodecError),
133 /// Connection timeout.
134 Timeout,
135 /// Request was cancelled.
136 Cancelled,
137}
138
139impl TransportError {
140 /// Returns true if this is a cancellation error.
141 #[must_use]
142 pub fn is_cancelled(&self) -> bool {
143 matches!(self, TransportError::Cancelled)
144 }
145
146 /// Returns true if this is an EOF/closed condition.
147 #[must_use]
148 pub fn is_closed(&self) -> bool {
149 matches!(self, TransportError::Closed)
150 }
151}
152
153impl std::fmt::Display for TransportError {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 match self {
156 TransportError::Io(e) => write!(f, "I/O error: {e}"),
157 TransportError::Closed => write!(f, "Transport closed"),
158 TransportError::Codec(e) => write!(f, "Codec error: {e}"),
159 TransportError::Timeout => write!(f, "Connection timeout"),
160 TransportError::Cancelled => write!(f, "Request cancelled"),
161 }
162 }
163}
164
165impl std::error::Error for TransportError {
166 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
167 match self {
168 TransportError::Io(e) => Some(e),
169 TransportError::Codec(e) => Some(e),
170 _ => None,
171 }
172 }
173}
174
175impl From<std::io::Error> for TransportError {
176 fn from(err: std::io::Error) -> Self {
177 TransportError::Io(err)
178 }
179}
180
181impl From<CodecError> for TransportError {
182 fn from(err: CodecError) -> Self {
183 TransportError::Codec(err)
184 }
185}
186
187// =============================================================================
188// Two-Phase Send Protocol
189// =============================================================================
190
191/// A permit for sending a message via two-phase commit.
192///
193/// This implements the reserve/commit pattern for cancel-safe message sending:
194/// 1. **Reserve**: Allocate the permit (cancellable)
195/// 2. **Commit**: Send the message (infallible after reserve)
196///
197/// # Cancel-Safety
198///
199/// The reservation phase is the cancellation point. Once you have a permit,
200/// the send will complete. This ensures no message loss on cancellation:
201///
202/// ```ignore
203/// // Cancel-safe pattern:
204/// let permit = transport.reserve_send(cx)?; // Can be cancelled here
205/// permit.send(message); // Always succeeds
206/// ```
207///
208/// # Example
209///
210/// ```ignore
211/// use fastmcp_transport::{AsyncStdioTransport, TwoPhaseTransport};
212/// use asupersync::Cx;
213///
214/// let mut transport = AsyncStdioTransport::new();
215/// let cx = Cx::for_testing();
216///
217/// // Reserve a send slot (cancellable)
218/// let permit = transport.reserve_send(&cx)?;
219///
220/// // At this point, we're committed - send is infallible
221/// permit.send(&JsonRpcMessage::Request(request));
222/// ```
223pub struct SendPermit<'a, W: std::io::Write> {
224 writer: &'a mut W,
225 codec: &'a Codec,
226}
227
228impl<'a, W: std::io::Write> SendPermit<'a, W> {
229 /// Creates a new send permit.
230 ///
231 /// This is an internal constructor. Use `TwoPhaseTransport::reserve_send()`
232 /// to obtain a permit.
233 fn new(writer: &'a mut W, codec: &'a Codec) -> Self {
234 Self { writer, codec }
235 }
236
237 /// Commits the send by writing the message.
238 ///
239 /// This method is synchronous and, from the protocol's perspective,
240 /// infallible after reservation. I/O errors are returned but the
241 /// reservation is consumed regardless.
242 ///
243 /// # Errors
244 ///
245 /// Returns an error if the underlying write fails. However, the permit
246 /// is consumed and the reservation is released.
247 pub fn send(self, message: &JsonRpcMessage) -> Result<(), TransportError> {
248 let bytes = match message {
249 JsonRpcMessage::Request(req) => self.codec.encode_request(req)?,
250 JsonRpcMessage::Response(resp) => self.codec.encode_response(resp)?,
251 };
252
253 self.writer.write_all(&bytes)?;
254 self.writer.flush()?;
255 Ok(())
256 }
257
258 /// Commits the send by writing a request.
259 ///
260 /// Convenience method for sending a request directly.
261 ///
262 /// # Errors
263 ///
264 /// Returns an error if the underlying write fails.
265 pub fn send_request(self, request: &JsonRpcRequest) -> Result<(), TransportError> {
266 let bytes = self.codec.encode_request(request)?;
267 self.writer.write_all(&bytes)?;
268 self.writer.flush()?;
269 Ok(())
270 }
271
272 /// Commits the send by writing a response.
273 ///
274 /// Convenience method for sending a response directly.
275 ///
276 /// # Errors
277 ///
278 /// Returns an error if the underlying write fails.
279 pub fn send_response(self, response: &JsonRpcResponse) -> Result<(), TransportError> {
280 let bytes = self.codec.encode_response(response)?;
281 self.writer.write_all(&bytes)?;
282 self.writer.flush()?;
283 Ok(())
284 }
285}
286
287/// Extension trait for two-phase send operations.
288///
289/// This trait adds the reserve/commit pattern to transports. The pattern
290/// ensures cancel-safety by making the reservation the cancellation point:
291///
292/// - **Reserve phase**: Check cancellation, allocate resources
293/// - **Commit phase**: Actually send (synchronous, infallible from protocol perspective)
294///
295/// # Why Two-Phase?
296///
297/// Without two-phase:
298/// ```ignore
299/// // BROKEN: Can lose messages on cancel
300/// async fn bad_send(cx: &Cx, msg: Message) {
301/// let serialized = serialize(&msg); // Work done
302/// cx.checkpoint()?; // Cancel here = lost message!
303/// writer.write(&serialized).await;
304/// }
305/// ```
306///
307/// With two-phase:
308/// ```ignore
309/// // CORRECT: Either fully sent or not started
310/// async fn good_send(cx: &Cx, msg: Message) {
311/// let permit = transport.reserve_send(cx)?; // Cancel here = no work lost
312/// // After reserve, commit is synchronous and infallible
313/// permit.send(msg);
314/// }
315/// ```
316pub trait TwoPhaseTransport: Transport {
317 /// The writer type for permits.
318 type Writer: std::io::Write;
319
320 /// Reserve a send slot.
321 ///
322 /// This is the cancellation point for sends. If this succeeds, the
323 /// subsequent `permit.send()` will complete.
324 ///
325 /// # Errors
326 ///
327 /// Returns `TransportError::Cancelled` if the request has been cancelled.
328 fn reserve_send(&mut self, cx: &Cx) -> Result<SendPermit<'_, Self::Writer>, TransportError>;
329}