Skip to main content

fastmcp_transport/
stdio.rs

1//! Standard I/O transport for MCP.
2//!
3//! This is the primary transport for MCP servers running as subprocess.
4//! Uses newline-delimited JSON (NDJSON) framing.
5//!
6//! # Cancel-Safety
7//!
8//! The stdio transport integrates with asupersync's capability context:
9//! - Checks `cx.is_cancel_requested()` before blocking operations
10//! - Uses async I/O wrappers that integrate with cancellation
11//! - Properly handles EOF as transport closure
12//!
13//! # Async I/O Integration
14//!
15//! This module provides two transport implementations:
16//!
17//! - [`StdioTransport`]: Generic transport for any `Read`/`Write` types (for testing)
18//! - [`AsyncStdioTransport`]: Production transport using async I/O wrappers
19//!
20//! # Example
21//!
22//! ```ignore
23//! use fastmcp_transport::{AsyncStdioTransport, Transport};
24//! use asupersync::Cx;
25//!
26//! fn main() {
27//!     let mut transport = AsyncStdioTransport::new();
28//!     let cx = Cx::for_testing();
29//!
30//!     loop {
31//!         match transport.recv(&cx) {
32//!             Ok(msg) => handle_message(msg),
33//!             Err(TransportError::Closed) => break,
34//!             Err(TransportError::Cancelled) => break,
35//!             Err(e) => eprintln!("Error: {}", e),
36//!         }
37//!     }
38//! }
39//! ```
40
41use std::io::{BufRead, BufReader, Read, Write};
42
43use asupersync::{Budget, Cx};
44use fastmcp_protocol::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse};
45
46use crate::async_io::{AsyncLineReader, AsyncStdout};
47use crate::{Codec, CodecError, SendPermit, Transport, TransportError, TwoPhaseTransport};
48
49/// Stdio transport implementation.
50///
51/// Reads from stdin and writes to stdout using NDJSON framing.
52/// Integrates with asupersync for cancel-correct operation.
53///
54/// # Wire Format
55///
56/// Messages are newline-delimited JSON:
57/// - Each message is serialized as a single line of JSON
58/// - Lines are terminated by `\n` (LF, not CRLF)
59/// - Empty lines are ignored
60/// - UTF-8 encoding is required
61pub struct StdioTransport<R, W> {
62    reader: BufReader<R>,
63    writer: W,
64    codec: Codec,
65    line_buffer: String,
66}
67
68impl<R: Read, W: Write> StdioTransport<R, W> {
69    /// Creates a new stdio transport with custom reader/writer.
70    ///
71    /// This is useful for testing with mock I/O.
72    #[must_use]
73    pub fn new(reader: R, writer: W) -> Self {
74        Self {
75            reader: BufReader::new(reader),
76            writer,
77            codec: Codec::new(),
78            line_buffer: String::with_capacity(4096),
79        }
80    }
81
82    /// Encodes and sends a message, appending newline.
83    fn write_message(&mut self, message: &JsonRpcMessage) -> Result<(), TransportError> {
84        let bytes = match message {
85            JsonRpcMessage::Request(req) => self.codec.encode_request(req)?,
86            JsonRpcMessage::Response(resp) => self.codec.encode_response(resp)?,
87        };
88        self.writer.write_all(&bytes)?;
89        self.writer.flush()?;
90        Ok(())
91    }
92
93    /// Reads a line from the reader, handling EOF.
94    fn read_line(&mut self) -> Result<&str, TransportError> {
95        self.line_buffer.clear();
96        let bytes_read = self.reader.read_line(&mut self.line_buffer)?;
97
98        if bytes_read == 0 {
99            return Err(TransportError::Closed);
100        }
101
102        // Trim trailing newline
103        let line_len = {
104            let line = self
105                .line_buffer
106                .trim_end_matches('\n')
107                .trim_end_matches('\r');
108            line.len()
109        };
110        if line_len > self.codec.max_message_size() {
111            self.line_buffer.clear();
112            return Err(TransportError::Codec(CodecError::MessageTooLarge(line_len)));
113        }
114        let line = self
115            .line_buffer
116            .trim_end_matches('\n')
117            .trim_end_matches('\r');
118        Ok(line)
119    }
120}
121
122impl StdioTransport<std::io::Stdin, std::io::Stdout> {
123    /// Creates a transport using standard stdin/stdout.
124    ///
125    /// This is the primary constructor for MCP servers running as subprocess.
126    #[must_use]
127    pub fn stdio() -> Self {
128        Self::new(std::io::stdin(), std::io::stdout())
129    }
130}
131
132impl<R: Read, W: Write> Transport for StdioTransport<R, W> {
133    fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
134        // Check for cancellation before I/O
135        if cx.is_cancel_requested() {
136            return Err(TransportError::Cancelled);
137        }
138
139        self.write_message(message)
140    }
141
142    fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
143        // Check for cancellation before blocking read
144        if cx.is_cancel_requested() {
145            return Err(TransportError::Cancelled);
146        }
147
148        // Read lines until we get a non-empty one
149        loop {
150            let line = self.read_line()?;
151
152            // Skip empty lines
153            if line.is_empty() {
154                // Check cancellation between reads
155                if cx.is_cancel_requested() {
156                    return Err(TransportError::Cancelled);
157                }
158                continue;
159            }
160
161            // Parse the JSON message
162            let message: JsonRpcMessage = serde_json::from_str(line)
163                .map_err(|e| TransportError::Codec(crate::CodecError::Json(e)))?;
164
165            return Ok(message);
166        }
167    }
168
169    fn close(&mut self) -> Result<(), TransportError> {
170        self.writer.flush()?;
171        Ok(())
172    }
173}
174
175/// Helper to create request/response without cloning for internal use.
176impl<R: Read, W: Write> StdioTransport<R, W> {
177    /// Send a request directly (avoids clone in trait method).
178    pub fn send_request_direct(
179        &mut self,
180        cx: &Cx,
181        request: &JsonRpcRequest,
182    ) -> Result<(), TransportError> {
183        if cx.is_cancel_requested() {
184            return Err(TransportError::Cancelled);
185        }
186        let bytes = self.codec.encode_request(request)?;
187        self.writer.write_all(&bytes)?;
188        self.writer.flush()?;
189        Ok(())
190    }
191
192    /// Send a response directly (avoids clone in trait method).
193    pub fn send_response_direct(
194        &mut self,
195        cx: &Cx,
196        response: &JsonRpcResponse,
197    ) -> Result<(), TransportError> {
198        if cx.is_cancel_requested() {
199            return Err(TransportError::Cancelled);
200        }
201        let bytes = self.codec.encode_response(response)?;
202        self.writer.write_all(&bytes)?;
203        self.writer.flush()?;
204        Ok(())
205    }
206}
207
208impl<R: Read, W: Write> TwoPhaseTransport for StdioTransport<R, W> {
209    type Writer = W;
210
211    fn reserve_send(&mut self, cx: &Cx) -> Result<SendPermit<'_, Self::Writer>, TransportError> {
212        // Check cancellation - this is the cancellation point
213        if cx.is_cancel_requested() {
214            return Err(TransportError::Cancelled);
215        }
216
217        // Return permit that allows the send to proceed
218        Ok(SendPermit::new(&mut self.writer, &self.codec))
219    }
220}
221
222// =============================================================================
223// AsyncStdioTransport - Production async I/O transport
224// =============================================================================
225
226/// Async stdio transport with integrated cancellation support.
227///
228/// This is the production transport for MCP servers. It uses async I/O
229/// wrappers that integrate with asupersync's capability context for
230/// proper cancellation handling.
231///
232/// # Cancel-Safety
233///
234/// - Checks `cx.is_cancel_requested()` before and during blocking I/O
235/// - Returns `TransportError::Cancelled` when cancellation is detected
236/// - Integrates with asupersync's structured concurrency model
237///
238/// # Example
239///
240/// ```ignore
241/// use fastmcp_transport::{AsyncStdioTransport, Transport};
242/// use asupersync::Cx;
243///
244/// let mut transport = AsyncStdioTransport::new();
245/// let cx = Cx::for_testing();
246///
247/// // Receive messages until EOF or cancellation
248/// loop {
249///     match transport.recv(&cx) {
250///         Ok(msg) => process_message(msg),
251///         Err(TransportError::Closed) => break,
252///         Err(TransportError::Cancelled) => {
253///             eprintln!("Request cancelled");
254///             break;
255///         }
256///         Err(e) => return Err(e),
257///     }
258/// }
259/// ```
260pub struct AsyncStdioTransport {
261    reader: AsyncLineReader,
262    writer: AsyncStdout,
263    codec: Codec,
264}
265
266impl AsyncStdioTransport {
267    /// Creates a new async stdio transport.
268    ///
269    /// This is the primary constructor for MCP servers running as subprocess.
270    /// Uses async I/O wrappers that integrate with asupersync's cancellation.
271    #[must_use]
272    pub fn new() -> Self {
273        Self {
274            reader: AsyncLineReader::new(),
275            writer: AsyncStdout::new(),
276            codec: Codec::new(),
277        }
278    }
279}
280
281impl Default for AsyncStdioTransport {
282    fn default() -> Self {
283        Self::new()
284    }
285}
286
287impl Transport for AsyncStdioTransport {
288    fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
289        // Check for cancellation before I/O
290        if cx.is_cancel_requested() {
291            return Err(TransportError::Cancelled);
292        }
293
294        let bytes = match message {
295            JsonRpcMessage::Request(req) => self.codec.encode_request(req)?,
296            JsonRpcMessage::Response(resp) => self.codec.encode_response(resp)?,
297        };
298
299        // Use async-aware write with cancellation checking
300        self.writer.write_all_sync(cx, &bytes).map_err(|e| {
301            if e.kind() == std::io::ErrorKind::Interrupted {
302                TransportError::Cancelled
303            } else {
304                TransportError::Io(e)
305            }
306        })?;
307
308        self.writer.flush_sync(cx).map_err(|e| {
309            if e.kind() == std::io::ErrorKind::Interrupted {
310                TransportError::Cancelled
311            } else {
312                TransportError::Io(e)
313            }
314        })?;
315
316        Ok(())
317    }
318
319    fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
320        // Check for cancellation before blocking read
321        if cx.is_cancel_requested() {
322            return Err(TransportError::Cancelled);
323        }
324
325        // Read non-empty line with cancellation checking
326        let line = self
327            .reader
328            .read_non_empty_line(cx)
329            .map_err(|e| {
330                if e.kind() == std::io::ErrorKind::Interrupted {
331                    TransportError::Cancelled
332                } else {
333                    TransportError::Io(e)
334                }
335            })?
336            .ok_or(TransportError::Closed)?;
337
338        if line.len() > self.codec.max_message_size() {
339            return Err(TransportError::Codec(CodecError::MessageTooLarge(
340                line.len(),
341            )));
342        }
343
344        // Parse the JSON message
345        let message: JsonRpcMessage = serde_json::from_str(&line)
346            .map_err(|e| TransportError::Codec(crate::CodecError::Json(e)))?;
347
348        Ok(message)
349    }
350
351    fn close(&mut self) -> Result<(), TransportError> {
352        // Use an infinite budget context for close - ensures flush completes
353        // without cancellation (close should always complete)
354        let cx = Cx::for_request_with_budget(Budget::INFINITE);
355        self.writer.flush_sync(&cx)?;
356        Ok(())
357    }
358}
359
360impl AsyncStdioTransport {
361    /// Send a request directly (avoids clone in trait method).
362    pub fn send_request_direct(
363        &mut self,
364        cx: &Cx,
365        request: &JsonRpcRequest,
366    ) -> Result<(), TransportError> {
367        if cx.is_cancel_requested() {
368            return Err(TransportError::Cancelled);
369        }
370
371        let bytes = self.codec.encode_request(request)?;
372
373        self.writer.write_all_sync(cx, &bytes).map_err(|e| {
374            if e.kind() == std::io::ErrorKind::Interrupted {
375                TransportError::Cancelled
376            } else {
377                TransportError::Io(e)
378            }
379        })?;
380
381        self.writer.flush_sync(cx).map_err(|e| {
382            if e.kind() == std::io::ErrorKind::Interrupted {
383                TransportError::Cancelled
384            } else {
385                TransportError::Io(e)
386            }
387        })
388    }
389
390    /// Send a response directly (avoids clone in trait method).
391    pub fn send_response_direct(
392        &mut self,
393        cx: &Cx,
394        response: &JsonRpcResponse,
395    ) -> Result<(), TransportError> {
396        if cx.is_cancel_requested() {
397            return Err(TransportError::Cancelled);
398        }
399
400        let bytes = self.codec.encode_response(response)?;
401
402        self.writer.write_all_sync(cx, &bytes).map_err(|e| {
403            if e.kind() == std::io::ErrorKind::Interrupted {
404                TransportError::Cancelled
405            } else {
406                TransportError::Io(e)
407            }
408        })?;
409
410        self.writer.flush_sync(cx).map_err(|e| {
411            if e.kind() == std::io::ErrorKind::Interrupted {
412                TransportError::Cancelled
413            } else {
414                TransportError::Io(e)
415            }
416        })
417    }
418}
419
420impl TwoPhaseTransport for AsyncStdioTransport {
421    type Writer = AsyncStdout;
422
423    fn reserve_send(&mut self, cx: &Cx) -> Result<SendPermit<'_, Self::Writer>, TransportError> {
424        // Check cancellation - this is the cancellation point
425        if cx.is_cancel_requested() {
426            return Err(TransportError::Cancelled);
427        }
428
429        // Return permit that allows the send to proceed
430        // The commit phase uses Write trait impl which bypasses cancellation checks
431        Ok(SendPermit::new(&mut self.writer, &self.codec))
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use std::io::Cursor;
439
440    #[test]
441    fn test_send_receive_roundtrip() {
442        // Create a transport with a buffer as both reader and writer
443        let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"id\":1}\n";
444        let reader = Cursor::new(input.to_vec());
445        let writer = Vec::new();
446
447        let mut transport = StdioTransport::new(reader, writer);
448
449        // Use Cx::for_testing() for unit tests
450        let cx = Cx::for_testing();
451        let msg = transport.recv(&cx).unwrap();
452        assert!(matches!(&msg, JsonRpcMessage::Request(_)));
453        if let JsonRpcMessage::Request(req) = msg {
454            assert_eq!(req.method, "test");
455        }
456    }
457
458    #[test]
459    fn test_send_message() {
460        let reader = Cursor::new(Vec::new());
461        let writer = Vec::new();
462
463        let mut transport = StdioTransport::new(reader, writer);
464
465        let cx = Cx::for_testing();
466        let request = JsonRpcRequest::new("test/method", None, 1i64);
467        transport.send_request_direct(&cx, &request).unwrap();
468    }
469
470    #[test]
471    fn test_eof_returns_closed() {
472        // Empty input = immediate EOF
473        let reader = Cursor::new(Vec::new());
474        let writer = Vec::new();
475
476        let mut transport = StdioTransport::new(reader, writer);
477
478        let cx = Cx::for_testing();
479        let result = transport.recv(&cx);
480        assert!(matches!(result, Err(TransportError::Closed)));
481    }
482
483    #[test]
484    fn test_skip_empty_lines() {
485        // Input with empty lines before the actual message
486        let input = b"\n\n{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"id\":1}\n";
487        let reader = Cursor::new(input.to_vec());
488        let writer = Vec::new();
489
490        let mut transport = StdioTransport::new(reader, writer);
491
492        let cx = Cx::for_testing();
493        let msg = transport.recv(&cx).unwrap();
494        assert!(matches!(&msg, JsonRpcMessage::Request(_)));
495        if let JsonRpcMessage::Request(req) = msg {
496            assert_eq!(req.method, "test");
497        }
498    }
499
500    #[test]
501    fn test_recv_rejects_oversized_line() {
502        let request = JsonRpcRequest::new("test/method", None, 1i64);
503        let line = serde_json::to_vec(&request).unwrap();
504        let mut input = line.clone();
505        input.push(b'\n');
506        let reader = Cursor::new(input);
507        let writer = Vec::new();
508
509        let mut transport = StdioTransport::new(reader, writer);
510        transport
511            .codec
512            .set_max_message_size(line.len().saturating_sub(1));
513
514        let cx = Cx::for_testing();
515        let result = transport.recv(&cx);
516        assert!(matches!(
517            result,
518            Err(TransportError::Codec(CodecError::MessageTooLarge(_)))
519        ));
520    }
521
522    #[test]
523    fn test_cancellation_on_recv() {
524        let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"id\":1}\n";
525        let reader = Cursor::new(input.to_vec());
526        let writer = Vec::new();
527
528        let mut transport = StdioTransport::new(reader, writer);
529
530        let cx = Cx::for_testing();
531        cx.set_cancel_requested(true);
532
533        let result = transport.recv(&cx);
534        assert!(matches!(result, Err(TransportError::Cancelled)));
535    }
536
537    #[test]
538    fn test_cancellation_on_send() {
539        let reader = Cursor::new(Vec::new());
540        let writer = Vec::new();
541
542        let mut transport = StdioTransport::new(reader, writer);
543
544        let cx = Cx::for_testing();
545        cx.set_cancel_requested(true);
546
547        let request = JsonRpcRequest::new("test/method", None, 1i64);
548        let result = transport.send_request_direct(&cx, &request);
549        assert!(matches!(result, Err(TransportError::Cancelled)));
550    }
551
552    #[test]
553    fn test_two_phase_send_success() {
554        let reader = Cursor::new(Vec::new());
555        let writer = Vec::new();
556
557        let mut transport = StdioTransport::new(reader, writer);
558
559        let cx = Cx::for_testing();
560
561        // Reserve a send slot
562        let permit = transport.reserve_send(&cx).unwrap();
563
564        // Send a request via the permit
565        let request = JsonRpcRequest::new("test/method", None, 1i64);
566        permit.send_request(&request).unwrap();
567    }
568
569    #[test]
570    fn test_two_phase_send_cancellation_on_reserve() {
571        let reader = Cursor::new(Vec::new());
572        let writer = Vec::new();
573
574        let mut transport = StdioTransport::new(reader, writer);
575
576        let cx = Cx::for_testing();
577        cx.set_cancel_requested(true);
578
579        // Reservation should fail when cancelled
580        let result = transport.reserve_send(&cx);
581        assert!(matches!(result, Err(TransportError::Cancelled)));
582    }
583
584    #[test]
585    fn test_two_phase_send_message() {
586        let reader = Cursor::new(Vec::new());
587        let writer = Vec::new();
588
589        let mut transport = StdioTransport::new(reader, writer);
590
591        let cx = Cx::for_testing();
592
593        // Reserve and send using the generic send method
594        let permit = transport.reserve_send(&cx).unwrap();
595        let request = JsonRpcRequest::new("test/method", None, 1i64);
596        let message = JsonRpcMessage::Request(request);
597        permit.send(&message).unwrap();
598    }
599
600    // =========================================================================
601    // E2E Stdio NDJSON Tests (bd-2kv / bd-swyn)
602    // =========================================================================
603
604    #[test]
605    fn e2e_ndjson_multiple_messages_in_sequence() {
606        // Simulate multiple JSON-RPC messages in NDJSON format
607        let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"init\",\"id\":1}\n\
608                      {\"jsonrpc\":\"2.0\",\"method\":\"tools/list\",\"id\":2}\n\
609                      {\"jsonrpc\":\"2.0\",\"method\":\"tools/call\",\"params\":{\"name\":\"test\"},\"id\":3}\n";
610
611        let reader = Cursor::new(input.to_vec());
612        let writer = Vec::new();
613        let mut transport = StdioTransport::new(reader, writer);
614        let cx = Cx::for_testing();
615
616        // Receive first message
617        let msg1 = transport.recv(&cx).unwrap();
618        assert!(
619            matches!(msg1, JsonRpcMessage::Request(_)),
620            "Expected request"
621        );
622        let JsonRpcMessage::Request(req) = msg1 else {
623            return;
624        };
625        assert_eq!(req.method, "init");
626
627        // Receive second message
628        let msg2 = transport.recv(&cx).unwrap();
629        assert!(
630            matches!(msg2, JsonRpcMessage::Request(_)),
631            "Expected request"
632        );
633        let JsonRpcMessage::Request(req) = msg2 else {
634            return;
635        };
636        assert_eq!(req.method, "tools/list");
637
638        // Receive third message
639        let msg3 = transport.recv(&cx).unwrap();
640        assert!(
641            matches!(msg3, JsonRpcMessage::Request(_)),
642            "Expected request"
643        );
644        let JsonRpcMessage::Request(req) = msg3 else {
645            return;
646        };
647        assert_eq!(req.method, "tools/call");
648        assert!(req.params.is_some());
649
650        // Fourth recv should return EOF (Closed)
651        let result = transport.recv(&cx);
652        assert!(matches!(result, Err(TransportError::Closed)));
653    }
654
655    #[test]
656    fn e2e_ndjson_request_response_flow() {
657        // Test a typical request/response flow
658        let input = b"{\"jsonrpc\":\"2.0\",\"result\":{\"success\":true},\"id\":1}\n";
659
660        let reader = Cursor::new(input.to_vec());
661        let mut output = Vec::new();
662        let mut transport = StdioTransport::new(reader, Cursor::new(&mut output));
663        let cx = Cx::for_testing();
664
665        // Send a request
666        let request = JsonRpcRequest::new(
667            "test/method",
668            Some(serde_json::json!({"key": "value"})),
669            1i64,
670        );
671        transport.send_request_direct(&cx, &request).unwrap();
672
673        // Receive response
674        let msg = transport.recv(&cx).unwrap();
675        assert!(
676            matches!(msg, JsonRpcMessage::Response(_)),
677            "Expected response"
678        );
679        let JsonRpcMessage::Response(resp) = msg else {
680            return;
681        };
682        assert!(resp.result.is_some());
683        assert!(resp.error.is_none());
684    }
685
686    #[test]
687    fn e2e_ndjson_handles_mixed_empty_lines() {
688        // NDJSON should skip empty lines
689        let input = b"\n\n{\"jsonrpc\":\"2.0\",\"method\":\"test1\",\"id\":1}\n\n\n{\"jsonrpc\":\"2.0\",\"method\":\"test2\",\"id\":2}\n\n";
690
691        let reader = Cursor::new(input.to_vec());
692        let writer = Vec::new();
693        let mut transport = StdioTransport::new(reader, writer);
694        let cx = Cx::for_testing();
695
696        // Should receive both messages despite empty lines
697        let msg1 = transport.recv(&cx).unwrap();
698        assert!(
699            matches!(msg1, JsonRpcMessage::Request(_)),
700            "Expected request"
701        );
702        let JsonRpcMessage::Request(req) = msg1 else {
703            return;
704        };
705        assert_eq!(req.method, "test1");
706
707        let msg2 = transport.recv(&cx).unwrap();
708        assert!(
709            matches!(msg2, JsonRpcMessage::Request(_)),
710            "Expected request"
711        );
712        let JsonRpcMessage::Request(req) = msg2 else {
713            return;
714        };
715        assert_eq!(req.method, "test2");
716    }
717
718    #[test]
719    fn e2e_ndjson_handles_unicode_content() {
720        // Test UTF-8 handling in NDJSON
721        let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"params\":{\"message\":\"\xC3\xA9\xC3\xA8\xC3\xAA\xE4\xB8\xAD\xE6\x96\x87\xF0\x9F\x91\x8B\"},\"id\":1}\n";
722
723        let reader = Cursor::new(input.to_vec());
724        let writer = Vec::new();
725        let mut transport = StdioTransport::new(reader, writer);
726        let cx = Cx::for_testing();
727
728        let msg = transport.recv(&cx).unwrap();
729        assert!(
730            matches!(msg, JsonRpcMessage::Request(_)),
731            "Expected request"
732        );
733        let JsonRpcMessage::Request(req) = msg else {
734            return;
735        };
736        assert_eq!(req.method, "test");
737        let params = req.params.as_ref().unwrap();
738        let message = params.get("message").unwrap().as_str().unwrap();
739        // Contains: éèê中文👋
740        assert!(message.contains("é"));
741        assert!(message.contains("中"));
742        assert!(message.contains("👋"));
743    }
744
745    #[test]
746    fn e2e_ndjson_large_message() {
747        // Test handling of larger messages
748        let large_data = "x".repeat(100_000);
749        let message = format!(
750            "{{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"params\":{{\"data\":\"{}\"}},\"id\":1}}\n",
751            large_data
752        );
753
754        let reader = Cursor::new(message.into_bytes());
755        let writer = Vec::new();
756        let mut transport = StdioTransport::new(reader, writer);
757        let cx = Cx::for_testing();
758
759        let msg = transport.recv(&cx).unwrap();
760        assert!(
761            matches!(msg, JsonRpcMessage::Request(_)),
762            "Expected request"
763        );
764        let JsonRpcMessage::Request(req) = msg else {
765            return;
766        };
767        assert_eq!(req.method, "test");
768        let params = req.params.as_ref().unwrap();
769        let data = params.get("data").unwrap().as_str().unwrap();
770        assert_eq!(data.len(), 100_000);
771    }
772
773    #[test]
774    fn e2e_ndjson_notification() {
775        // Test JSON-RPC notifications (requests without id)
776        let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"notifications/initialized\"}\n";
777
778        let reader = Cursor::new(input.to_vec());
779        let writer = Vec::new();
780        let mut transport = StdioTransport::new(reader, writer);
781        let cx = Cx::for_testing();
782
783        let msg = transport.recv(&cx).unwrap();
784        assert!(
785            matches!(msg, JsonRpcMessage::Request(_)),
786            "Expected request/notification"
787        );
788        let JsonRpcMessage::Request(req) = msg else {
789            return;
790        };
791        assert_eq!(req.method, "notifications/initialized");
792        assert!(req.id.is_none());
793    }
794
795    #[test]
796    fn e2e_ndjson_error_response() {
797        // Test JSON-RPC error response parsing
798        let input = b"{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32601,\"message\":\"Method not found\"},\"id\":1}\n";
799
800        let reader = Cursor::new(input.to_vec());
801        let writer = Vec::new();
802        let mut transport = StdioTransport::new(reader, writer);
803        let cx = Cx::for_testing();
804
805        let msg = transport.recv(&cx).unwrap();
806        assert!(
807            matches!(msg, JsonRpcMessage::Response(_)),
808            "Expected response"
809        );
810        let JsonRpcMessage::Response(resp) = msg else {
811            return;
812        };
813        assert!(resp.result.is_none());
814        assert!(resp.error.is_some());
815        let error = resp.error.unwrap();
816        assert_eq!(error.code, -32601);
817        assert_eq!(error.message, "Method not found");
818    }
819
820    #[test]
821    fn e2e_ndjson_malformed_json_error() {
822        // Test handling of malformed JSON
823        let input = b"{invalid json\n";
824
825        let reader = Cursor::new(input.to_vec());
826        let writer = Vec::new();
827        let mut transport = StdioTransport::new(reader, writer);
828        let cx = Cx::for_testing();
829
830        let result = transport.recv(&cx);
831        assert!(matches!(result, Err(TransportError::Codec(_))));
832    }
833
834    #[test]
835    fn e2e_ndjson_bidirectional_flow() {
836        // Test bidirectional communication (simulated)
837        let input = b"{\"jsonrpc\":\"2.0\",\"result\":{\"tools\":[]},\"id\":1}\n";
838        let reader = Cursor::new(input.to_vec());
839        let mut output = Vec::new();
840
841        // Create transport with a writeable output buffer
842        {
843            let mut transport = StdioTransport::new(reader, &mut output);
844            let cx = Cx::for_testing();
845
846            // Send a request
847            let request = JsonRpcRequest::new("tools/list", None, 1i64);
848            transport.send_request_direct(&cx, &request).unwrap();
849
850            // Receive response
851            let msg = transport.recv(&cx).unwrap();
852            assert!(matches!(msg, JsonRpcMessage::Response(_)));
853        }
854
855        // Verify the sent message is valid NDJSON
856        let sent = String::from_utf8(output).unwrap();
857        assert!(sent.ends_with('\n'));
858        assert!(sent.contains("\"method\":\"tools/list\""));
859        assert!(sent.contains("\"jsonrpc\":\"2.0\""));
860    }
861
862    #[test]
863    fn e2e_ndjson_response_with_complex_result() {
864        // Test response with complex nested result
865        let input = b"{\"jsonrpc\":\"2.0\",\"result\":{\"tools\":[{\"name\":\"tool1\",\"description\":\"A test tool\",\"inputSchema\":{\"type\":\"object\"}}]},\"id\":1}\n";
866
867        let reader = Cursor::new(input.to_vec());
868        let writer = Vec::new();
869        let mut transport = StdioTransport::new(reader, writer);
870        let cx = Cx::for_testing();
871
872        let msg = transport.recv(&cx).unwrap();
873        assert!(
874            matches!(msg, JsonRpcMessage::Response(_)),
875            "Expected response"
876        );
877        let JsonRpcMessage::Response(resp) = msg else {
878            return;
879        };
880        let result = resp.result.unwrap();
881        let tools = result.get("tools").unwrap().as_array().unwrap();
882        assert_eq!(tools.len(), 1);
883        assert_eq!(tools[0].get("name").unwrap(), "tool1");
884    }
885
886    #[test]
887    fn e2e_two_phase_send_multiple_messages() {
888        // Test multiple two-phase sends in sequence
889        let reader = Cursor::new(Vec::new());
890        let mut output = Vec::new();
891
892        {
893            let mut transport = StdioTransport::new(reader, &mut output);
894            let cx = Cx::for_testing();
895
896            // Send multiple messages using two-phase pattern
897            for i in 1..=5 {
898                let permit = transport.reserve_send(&cx).unwrap();
899                let request = JsonRpcRequest::new(format!("method_{i}"), None, i as i64);
900                permit.send_request(&request).unwrap();
901            }
902        }
903
904        // Verify all messages were sent as valid NDJSON
905        let sent = String::from_utf8(output).unwrap();
906        let lines: Vec<&str> = sent.lines().collect();
907        assert_eq!(lines.len(), 5);
908
909        for (i, line) in lines.iter().enumerate() {
910            let expected_method = format!("method_{}", i + 1);
911            assert!(line.contains(&expected_method));
912        }
913    }
914
915    #[test]
916    fn e2e_transport_close_flushes() {
917        let reader = Cursor::new(Vec::new());
918        let mut output = Vec::new();
919
920        {
921            let mut transport = StdioTransport::new(reader, &mut output);
922            let cx = Cx::for_testing();
923
924            // Send a message
925            let request = JsonRpcRequest::new("test", None, 1i64);
926            transport.send_request_direct(&cx, &request).unwrap();
927
928            // Close should flush
929            transport.close().unwrap();
930        }
931
932        // Verify data was flushed
933        let sent = String::from_utf8(output).unwrap();
934        assert!(!sent.is_empty());
935        assert!(sent.contains("\"method\":\"test\""));
936    }
937
938    // =========================================================================
939    // Additional coverage tests (bd-19vz)
940    // =========================================================================
941
942    #[test]
943    fn send_response_direct_writes_valid_ndjson() {
944        let reader = Cursor::new(Vec::new());
945        let mut output = Vec::new();
946
947        {
948            let mut transport = StdioTransport::new(reader, &mut output);
949            let cx = Cx::for_testing();
950
951            let response = JsonRpcResponse {
952                jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
953                result: Some(serde_json::json!({"ok": true})),
954                error: None,
955                id: Some(fastmcp_protocol::RequestId::Number(42)),
956            };
957            transport.send_response_direct(&cx, &response).unwrap();
958        }
959
960        let sent = String::from_utf8(output).unwrap();
961        assert!(sent.ends_with('\n'));
962        assert!(sent.contains("\"result\""));
963        assert!(sent.contains("\"ok\":true") || sent.contains("\"ok\": true"));
964    }
965
966    #[test]
967    fn send_response_direct_cancelled() {
968        let reader = Cursor::new(Vec::new());
969        let writer = Vec::new();
970        let mut transport = StdioTransport::new(reader, writer);
971
972        let cx = Cx::for_testing();
973        cx.set_cancel_requested(true);
974
975        let response = JsonRpcResponse {
976            jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
977            result: Some(serde_json::json!(null)),
978            error: None,
979            id: Some(fastmcp_protocol::RequestId::Number(1)),
980        };
981        let result = transport.send_response_direct(&cx, &response);
982        assert!(matches!(result, Err(TransportError::Cancelled)));
983    }
984
985    #[test]
986    fn transport_send_trait_method_with_response() {
987        let reader = Cursor::new(Vec::new());
988        let mut output = Vec::new();
989
990        {
991            let mut transport = StdioTransport::new(reader, &mut output);
992            let cx = Cx::for_testing();
993
994            let response = JsonRpcResponse {
995                jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
996                result: Some(serde_json::json!({"status": "done"})),
997                error: None,
998                id: Some(fastmcp_protocol::RequestId::Number(1)),
999            };
1000            transport
1001                .send(&cx, &JsonRpcMessage::Response(response))
1002                .unwrap();
1003        }
1004
1005        let sent = String::from_utf8(output).unwrap();
1006        assert!(sent.contains("\"status\""));
1007    }
1008
1009    #[test]
1010    fn transport_send_trait_cancelled() {
1011        let reader = Cursor::new(Vec::new());
1012        let writer = Vec::new();
1013        let mut transport = StdioTransport::new(reader, writer);
1014
1015        let cx = Cx::for_testing();
1016        cx.set_cancel_requested(true);
1017
1018        let request = JsonRpcRequest::new("test", None, 1i64);
1019        let result = transport.send(&cx, &JsonRpcMessage::Request(request));
1020        assert!(matches!(result, Err(TransportError::Cancelled)));
1021    }
1022
1023    #[test]
1024    fn two_phase_send_response_via_permit() {
1025        let reader = Cursor::new(Vec::new());
1026        let mut output = Vec::new();
1027
1028        {
1029            let mut transport = StdioTransport::new(reader, &mut output);
1030            let cx = Cx::for_testing();
1031
1032            let permit = transport.reserve_send(&cx).unwrap();
1033            let response = JsonRpcResponse {
1034                jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1035                result: Some(serde_json::json!({"v": 1})),
1036                error: None,
1037                id: Some(fastmcp_protocol::RequestId::Number(1)),
1038            };
1039            permit.send_response(&response).unwrap();
1040        }
1041
1042        let sent = String::from_utf8(output).unwrap();
1043        assert!(sent.contains("\"result\""));
1044    }
1045
1046    #[test]
1047    fn recv_handles_crlf_line_endings() {
1048        // Windows-style CRLF should be handled correctly
1049        let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"id\":1}\r\n";
1050        let reader = Cursor::new(input.to_vec());
1051        let writer = Vec::new();
1052        let mut transport = StdioTransport::new(reader, writer);
1053        let cx = Cx::for_testing();
1054
1055        let msg = transport.recv(&cx).unwrap();
1056        if let JsonRpcMessage::Request(req) = msg {
1057            assert_eq!(req.method, "test");
1058        } else {
1059            panic!("Expected request");
1060        }
1061    }
1062}