1use std::io::{BufRead, BufReader, Read, Write};
42
43use asupersync::{Budget, Cx};
44use fastmcp_protocol::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse};
45
46use crate::async_io::{AsyncLineReader, AsyncStdout};
47use crate::{Codec, CodecError, SendPermit, Transport, TransportError, TwoPhaseTransport};
48
49pub struct StdioTransport<R, W> {
62 reader: BufReader<R>,
63 writer: W,
64 codec: Codec,
65 line_buffer: String,
66}
67
68impl<R: Read, W: Write> StdioTransport<R, W> {
69 #[must_use]
73 pub fn new(reader: R, writer: W) -> Self {
74 Self {
75 reader: BufReader::new(reader),
76 writer,
77 codec: Codec::new(),
78 line_buffer: String::with_capacity(4096),
79 }
80 }
81
82 fn write_message(&mut self, message: &JsonRpcMessage) -> Result<(), TransportError> {
84 let bytes = match message {
85 JsonRpcMessage::Request(req) => self.codec.encode_request(req)?,
86 JsonRpcMessage::Response(resp) => self.codec.encode_response(resp)?,
87 };
88 self.writer.write_all(&bytes)?;
89 self.writer.flush()?;
90 Ok(())
91 }
92
93 fn read_line(&mut self) -> Result<&str, TransportError> {
95 self.line_buffer.clear();
96 let bytes_read = self.reader.read_line(&mut self.line_buffer)?;
97
98 if bytes_read == 0 {
99 return Err(TransportError::Closed);
100 }
101
102 let line_len = {
104 let line = self
105 .line_buffer
106 .trim_end_matches('\n')
107 .trim_end_matches('\r');
108 line.len()
109 };
110 if line_len > self.codec.max_message_size() {
111 self.line_buffer.clear();
112 return Err(TransportError::Codec(CodecError::MessageTooLarge(line_len)));
113 }
114 let line = self
115 .line_buffer
116 .trim_end_matches('\n')
117 .trim_end_matches('\r');
118 Ok(line)
119 }
120}
121
122impl StdioTransport<std::io::Stdin, std::io::Stdout> {
123 #[must_use]
127 pub fn stdio() -> Self {
128 Self::new(std::io::stdin(), std::io::stdout())
129 }
130}
131
132impl<R: Read, W: Write> Transport for StdioTransport<R, W> {
133 fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
134 if cx.is_cancel_requested() {
136 return Err(TransportError::Cancelled);
137 }
138
139 self.write_message(message)
140 }
141
142 fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
143 if cx.is_cancel_requested() {
145 return Err(TransportError::Cancelled);
146 }
147
148 loop {
150 let line = self.read_line()?;
151
152 if line.is_empty() {
154 if cx.is_cancel_requested() {
156 return Err(TransportError::Cancelled);
157 }
158 continue;
159 }
160
161 let message: JsonRpcMessage = serde_json::from_str(line)
163 .map_err(|e| TransportError::Codec(crate::CodecError::Json(e)))?;
164
165 return Ok(message);
166 }
167 }
168
169 fn close(&mut self) -> Result<(), TransportError> {
170 self.writer.flush()?;
171 Ok(())
172 }
173}
174
175impl<R: Read, W: Write> StdioTransport<R, W> {
177 pub fn send_request_direct(
179 &mut self,
180 cx: &Cx,
181 request: &JsonRpcRequest,
182 ) -> Result<(), TransportError> {
183 if cx.is_cancel_requested() {
184 return Err(TransportError::Cancelled);
185 }
186 let bytes = self.codec.encode_request(request)?;
187 self.writer.write_all(&bytes)?;
188 self.writer.flush()?;
189 Ok(())
190 }
191
192 pub fn send_response_direct(
194 &mut self,
195 cx: &Cx,
196 response: &JsonRpcResponse,
197 ) -> Result<(), TransportError> {
198 if cx.is_cancel_requested() {
199 return Err(TransportError::Cancelled);
200 }
201 let bytes = self.codec.encode_response(response)?;
202 self.writer.write_all(&bytes)?;
203 self.writer.flush()?;
204 Ok(())
205 }
206}
207
208impl<R: Read, W: Write> TwoPhaseTransport for StdioTransport<R, W> {
209 type Writer = W;
210
211 fn reserve_send(&mut self, cx: &Cx) -> Result<SendPermit<'_, Self::Writer>, TransportError> {
212 if cx.is_cancel_requested() {
214 return Err(TransportError::Cancelled);
215 }
216
217 Ok(SendPermit::new(&mut self.writer, &self.codec))
219 }
220}
221
222pub struct AsyncStdioTransport {
261 reader: AsyncLineReader,
262 writer: AsyncStdout,
263 codec: Codec,
264}
265
266impl AsyncStdioTransport {
267 #[must_use]
272 pub fn new() -> Self {
273 Self {
274 reader: AsyncLineReader::new(),
275 writer: AsyncStdout::new(),
276 codec: Codec::new(),
277 }
278 }
279}
280
281impl Default for AsyncStdioTransport {
282 fn default() -> Self {
283 Self::new()
284 }
285}
286
287impl Transport for AsyncStdioTransport {
288 fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
289 if cx.is_cancel_requested() {
291 return Err(TransportError::Cancelled);
292 }
293
294 let bytes = match message {
295 JsonRpcMessage::Request(req) => self.codec.encode_request(req)?,
296 JsonRpcMessage::Response(resp) => self.codec.encode_response(resp)?,
297 };
298
299 self.writer.write_all_sync(cx, &bytes).map_err(|e| {
301 if e.kind() == std::io::ErrorKind::Interrupted {
302 TransportError::Cancelled
303 } else {
304 TransportError::Io(e)
305 }
306 })?;
307
308 self.writer.flush_sync(cx).map_err(|e| {
309 if e.kind() == std::io::ErrorKind::Interrupted {
310 TransportError::Cancelled
311 } else {
312 TransportError::Io(e)
313 }
314 })?;
315
316 Ok(())
317 }
318
319 fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
320 if cx.is_cancel_requested() {
322 return Err(TransportError::Cancelled);
323 }
324
325 let line = self
327 .reader
328 .read_non_empty_line(cx)
329 .map_err(|e| {
330 if e.kind() == std::io::ErrorKind::Interrupted {
331 TransportError::Cancelled
332 } else {
333 TransportError::Io(e)
334 }
335 })?
336 .ok_or(TransportError::Closed)?;
337
338 if line.len() > self.codec.max_message_size() {
339 return Err(TransportError::Codec(CodecError::MessageTooLarge(
340 line.len(),
341 )));
342 }
343
344 let message: JsonRpcMessage = serde_json::from_str(&line)
346 .map_err(|e| TransportError::Codec(crate::CodecError::Json(e)))?;
347
348 Ok(message)
349 }
350
351 fn close(&mut self) -> Result<(), TransportError> {
352 let cx = Cx::for_request_with_budget(Budget::INFINITE);
355 self.writer.flush_sync(&cx)?;
356 Ok(())
357 }
358}
359
360impl AsyncStdioTransport {
361 pub fn send_request_direct(
363 &mut self,
364 cx: &Cx,
365 request: &JsonRpcRequest,
366 ) -> Result<(), TransportError> {
367 if cx.is_cancel_requested() {
368 return Err(TransportError::Cancelled);
369 }
370
371 let bytes = self.codec.encode_request(request)?;
372
373 self.writer.write_all_sync(cx, &bytes).map_err(|e| {
374 if e.kind() == std::io::ErrorKind::Interrupted {
375 TransportError::Cancelled
376 } else {
377 TransportError::Io(e)
378 }
379 })?;
380
381 self.writer.flush_sync(cx).map_err(|e| {
382 if e.kind() == std::io::ErrorKind::Interrupted {
383 TransportError::Cancelled
384 } else {
385 TransportError::Io(e)
386 }
387 })
388 }
389
390 pub fn send_response_direct(
392 &mut self,
393 cx: &Cx,
394 response: &JsonRpcResponse,
395 ) -> Result<(), TransportError> {
396 if cx.is_cancel_requested() {
397 return Err(TransportError::Cancelled);
398 }
399
400 let bytes = self.codec.encode_response(response)?;
401
402 self.writer.write_all_sync(cx, &bytes).map_err(|e| {
403 if e.kind() == std::io::ErrorKind::Interrupted {
404 TransportError::Cancelled
405 } else {
406 TransportError::Io(e)
407 }
408 })?;
409
410 self.writer.flush_sync(cx).map_err(|e| {
411 if e.kind() == std::io::ErrorKind::Interrupted {
412 TransportError::Cancelled
413 } else {
414 TransportError::Io(e)
415 }
416 })
417 }
418}
419
420impl TwoPhaseTransport for AsyncStdioTransport {
421 type Writer = AsyncStdout;
422
423 fn reserve_send(&mut self, cx: &Cx) -> Result<SendPermit<'_, Self::Writer>, TransportError> {
424 if cx.is_cancel_requested() {
426 return Err(TransportError::Cancelled);
427 }
428
429 Ok(SendPermit::new(&mut self.writer, &self.codec))
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use std::io::Cursor;
439
440 #[test]
441 fn test_send_receive_roundtrip() {
442 let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"id\":1}\n";
444 let reader = Cursor::new(input.to_vec());
445 let writer = Vec::new();
446
447 let mut transport = StdioTransport::new(reader, writer);
448
449 let cx = Cx::for_testing();
451 let msg = transport.recv(&cx).unwrap();
452 assert!(matches!(&msg, JsonRpcMessage::Request(_)));
453 if let JsonRpcMessage::Request(req) = msg {
454 assert_eq!(req.method, "test");
455 }
456 }
457
458 #[test]
459 fn test_send_message() {
460 let reader = Cursor::new(Vec::new());
461 let writer = Vec::new();
462
463 let mut transport = StdioTransport::new(reader, writer);
464
465 let cx = Cx::for_testing();
466 let request = JsonRpcRequest::new("test/method", None, 1i64);
467 transport.send_request_direct(&cx, &request).unwrap();
468 }
469
470 #[test]
471 fn test_eof_returns_closed() {
472 let reader = Cursor::new(Vec::new());
474 let writer = Vec::new();
475
476 let mut transport = StdioTransport::new(reader, writer);
477
478 let cx = Cx::for_testing();
479 let result = transport.recv(&cx);
480 assert!(matches!(result, Err(TransportError::Closed)));
481 }
482
483 #[test]
484 fn test_skip_empty_lines() {
485 let input = b"\n\n{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"id\":1}\n";
487 let reader = Cursor::new(input.to_vec());
488 let writer = Vec::new();
489
490 let mut transport = StdioTransport::new(reader, writer);
491
492 let cx = Cx::for_testing();
493 let msg = transport.recv(&cx).unwrap();
494 assert!(matches!(&msg, JsonRpcMessage::Request(_)));
495 if let JsonRpcMessage::Request(req) = msg {
496 assert_eq!(req.method, "test");
497 }
498 }
499
500 #[test]
501 fn test_recv_rejects_oversized_line() {
502 let request = JsonRpcRequest::new("test/method", None, 1i64);
503 let line = serde_json::to_vec(&request).unwrap();
504 let mut input = line.clone();
505 input.push(b'\n');
506 let reader = Cursor::new(input);
507 let writer = Vec::new();
508
509 let mut transport = StdioTransport::new(reader, writer);
510 transport
511 .codec
512 .set_max_message_size(line.len().saturating_sub(1));
513
514 let cx = Cx::for_testing();
515 let result = transport.recv(&cx);
516 assert!(matches!(
517 result,
518 Err(TransportError::Codec(CodecError::MessageTooLarge(_)))
519 ));
520 }
521
522 #[test]
523 fn test_cancellation_on_recv() {
524 let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"id\":1}\n";
525 let reader = Cursor::new(input.to_vec());
526 let writer = Vec::new();
527
528 let mut transport = StdioTransport::new(reader, writer);
529
530 let cx = Cx::for_testing();
531 cx.set_cancel_requested(true);
532
533 let result = transport.recv(&cx);
534 assert!(matches!(result, Err(TransportError::Cancelled)));
535 }
536
537 #[test]
538 fn test_cancellation_on_send() {
539 let reader = Cursor::new(Vec::new());
540 let writer = Vec::new();
541
542 let mut transport = StdioTransport::new(reader, writer);
543
544 let cx = Cx::for_testing();
545 cx.set_cancel_requested(true);
546
547 let request = JsonRpcRequest::new("test/method", None, 1i64);
548 let result = transport.send_request_direct(&cx, &request);
549 assert!(matches!(result, Err(TransportError::Cancelled)));
550 }
551
552 #[test]
553 fn test_two_phase_send_success() {
554 let reader = Cursor::new(Vec::new());
555 let writer = Vec::new();
556
557 let mut transport = StdioTransport::new(reader, writer);
558
559 let cx = Cx::for_testing();
560
561 let permit = transport.reserve_send(&cx).unwrap();
563
564 let request = JsonRpcRequest::new("test/method", None, 1i64);
566 permit.send_request(&request).unwrap();
567 }
568
569 #[test]
570 fn test_two_phase_send_cancellation_on_reserve() {
571 let reader = Cursor::new(Vec::new());
572 let writer = Vec::new();
573
574 let mut transport = StdioTransport::new(reader, writer);
575
576 let cx = Cx::for_testing();
577 cx.set_cancel_requested(true);
578
579 let result = transport.reserve_send(&cx);
581 assert!(matches!(result, Err(TransportError::Cancelled)));
582 }
583
584 #[test]
585 fn test_two_phase_send_message() {
586 let reader = Cursor::new(Vec::new());
587 let writer = Vec::new();
588
589 let mut transport = StdioTransport::new(reader, writer);
590
591 let cx = Cx::for_testing();
592
593 let permit = transport.reserve_send(&cx).unwrap();
595 let request = JsonRpcRequest::new("test/method", None, 1i64);
596 let message = JsonRpcMessage::Request(request);
597 permit.send(&message).unwrap();
598 }
599
600 #[test]
605 fn e2e_ndjson_multiple_messages_in_sequence() {
606 let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"init\",\"id\":1}\n\
608 {\"jsonrpc\":\"2.0\",\"method\":\"tools/list\",\"id\":2}\n\
609 {\"jsonrpc\":\"2.0\",\"method\":\"tools/call\",\"params\":{\"name\":\"test\"},\"id\":3}\n";
610
611 let reader = Cursor::new(input.to_vec());
612 let writer = Vec::new();
613 let mut transport = StdioTransport::new(reader, writer);
614 let cx = Cx::for_testing();
615
616 let msg1 = transport.recv(&cx).unwrap();
618 assert!(
619 matches!(msg1, JsonRpcMessage::Request(_)),
620 "Expected request"
621 );
622 let JsonRpcMessage::Request(req) = msg1 else {
623 return;
624 };
625 assert_eq!(req.method, "init");
626
627 let msg2 = transport.recv(&cx).unwrap();
629 assert!(
630 matches!(msg2, JsonRpcMessage::Request(_)),
631 "Expected request"
632 );
633 let JsonRpcMessage::Request(req) = msg2 else {
634 return;
635 };
636 assert_eq!(req.method, "tools/list");
637
638 let msg3 = transport.recv(&cx).unwrap();
640 assert!(
641 matches!(msg3, JsonRpcMessage::Request(_)),
642 "Expected request"
643 );
644 let JsonRpcMessage::Request(req) = msg3 else {
645 return;
646 };
647 assert_eq!(req.method, "tools/call");
648 assert!(req.params.is_some());
649
650 let result = transport.recv(&cx);
652 assert!(matches!(result, Err(TransportError::Closed)));
653 }
654
655 #[test]
656 fn e2e_ndjson_request_response_flow() {
657 let input = b"{\"jsonrpc\":\"2.0\",\"result\":{\"success\":true},\"id\":1}\n";
659
660 let reader = Cursor::new(input.to_vec());
661 let mut output = Vec::new();
662 let mut transport = StdioTransport::new(reader, Cursor::new(&mut output));
663 let cx = Cx::for_testing();
664
665 let request = JsonRpcRequest::new(
667 "test/method",
668 Some(serde_json::json!({"key": "value"})),
669 1i64,
670 );
671 transport.send_request_direct(&cx, &request).unwrap();
672
673 let msg = transport.recv(&cx).unwrap();
675 assert!(
676 matches!(msg, JsonRpcMessage::Response(_)),
677 "Expected response"
678 );
679 let JsonRpcMessage::Response(resp) = msg else {
680 return;
681 };
682 assert!(resp.result.is_some());
683 assert!(resp.error.is_none());
684 }
685
686 #[test]
687 fn e2e_ndjson_handles_mixed_empty_lines() {
688 let input = b"\n\n{\"jsonrpc\":\"2.0\",\"method\":\"test1\",\"id\":1}\n\n\n{\"jsonrpc\":\"2.0\",\"method\":\"test2\",\"id\":2}\n\n";
690
691 let reader = Cursor::new(input.to_vec());
692 let writer = Vec::new();
693 let mut transport = StdioTransport::new(reader, writer);
694 let cx = Cx::for_testing();
695
696 let msg1 = transport.recv(&cx).unwrap();
698 assert!(
699 matches!(msg1, JsonRpcMessage::Request(_)),
700 "Expected request"
701 );
702 let JsonRpcMessage::Request(req) = msg1 else {
703 return;
704 };
705 assert_eq!(req.method, "test1");
706
707 let msg2 = transport.recv(&cx).unwrap();
708 assert!(
709 matches!(msg2, JsonRpcMessage::Request(_)),
710 "Expected request"
711 );
712 let JsonRpcMessage::Request(req) = msg2 else {
713 return;
714 };
715 assert_eq!(req.method, "test2");
716 }
717
718 #[test]
719 fn e2e_ndjson_handles_unicode_content() {
720 let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"params\":{\"message\":\"\xC3\xA9\xC3\xA8\xC3\xAA\xE4\xB8\xAD\xE6\x96\x87\xF0\x9F\x91\x8B\"},\"id\":1}\n";
722
723 let reader = Cursor::new(input.to_vec());
724 let writer = Vec::new();
725 let mut transport = StdioTransport::new(reader, writer);
726 let cx = Cx::for_testing();
727
728 let msg = transport.recv(&cx).unwrap();
729 assert!(
730 matches!(msg, JsonRpcMessage::Request(_)),
731 "Expected request"
732 );
733 let JsonRpcMessage::Request(req) = msg else {
734 return;
735 };
736 assert_eq!(req.method, "test");
737 let params = req.params.as_ref().unwrap();
738 let message = params.get("message").unwrap().as_str().unwrap();
739 assert!(message.contains("é"));
741 assert!(message.contains("中"));
742 assert!(message.contains("👋"));
743 }
744
745 #[test]
746 fn e2e_ndjson_large_message() {
747 let large_data = "x".repeat(100_000);
749 let message = format!(
750 "{{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"params\":{{\"data\":\"{}\"}},\"id\":1}}\n",
751 large_data
752 );
753
754 let reader = Cursor::new(message.into_bytes());
755 let writer = Vec::new();
756 let mut transport = StdioTransport::new(reader, writer);
757 let cx = Cx::for_testing();
758
759 let msg = transport.recv(&cx).unwrap();
760 assert!(
761 matches!(msg, JsonRpcMessage::Request(_)),
762 "Expected request"
763 );
764 let JsonRpcMessage::Request(req) = msg else {
765 return;
766 };
767 assert_eq!(req.method, "test");
768 let params = req.params.as_ref().unwrap();
769 let data = params.get("data").unwrap().as_str().unwrap();
770 assert_eq!(data.len(), 100_000);
771 }
772
773 #[test]
774 fn e2e_ndjson_notification() {
775 let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"notifications/initialized\"}\n";
777
778 let reader = Cursor::new(input.to_vec());
779 let writer = Vec::new();
780 let mut transport = StdioTransport::new(reader, writer);
781 let cx = Cx::for_testing();
782
783 let msg = transport.recv(&cx).unwrap();
784 assert!(
785 matches!(msg, JsonRpcMessage::Request(_)),
786 "Expected request/notification"
787 );
788 let JsonRpcMessage::Request(req) = msg else {
789 return;
790 };
791 assert_eq!(req.method, "notifications/initialized");
792 assert!(req.id.is_none());
793 }
794
795 #[test]
796 fn e2e_ndjson_error_response() {
797 let input = b"{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32601,\"message\":\"Method not found\"},\"id\":1}\n";
799
800 let reader = Cursor::new(input.to_vec());
801 let writer = Vec::new();
802 let mut transport = StdioTransport::new(reader, writer);
803 let cx = Cx::for_testing();
804
805 let msg = transport.recv(&cx).unwrap();
806 assert!(
807 matches!(msg, JsonRpcMessage::Response(_)),
808 "Expected response"
809 );
810 let JsonRpcMessage::Response(resp) = msg else {
811 return;
812 };
813 assert!(resp.result.is_none());
814 assert!(resp.error.is_some());
815 let error = resp.error.unwrap();
816 assert_eq!(error.code, -32601);
817 assert_eq!(error.message, "Method not found");
818 }
819
820 #[test]
821 fn e2e_ndjson_malformed_json_error() {
822 let input = b"{invalid json\n";
824
825 let reader = Cursor::new(input.to_vec());
826 let writer = Vec::new();
827 let mut transport = StdioTransport::new(reader, writer);
828 let cx = Cx::for_testing();
829
830 let result = transport.recv(&cx);
831 assert!(matches!(result, Err(TransportError::Codec(_))));
832 }
833
834 #[test]
835 fn e2e_ndjson_bidirectional_flow() {
836 let input = b"{\"jsonrpc\":\"2.0\",\"result\":{\"tools\":[]},\"id\":1}\n";
838 let reader = Cursor::new(input.to_vec());
839 let mut output = Vec::new();
840
841 {
843 let mut transport = StdioTransport::new(reader, &mut output);
844 let cx = Cx::for_testing();
845
846 let request = JsonRpcRequest::new("tools/list", None, 1i64);
848 transport.send_request_direct(&cx, &request).unwrap();
849
850 let msg = transport.recv(&cx).unwrap();
852 assert!(matches!(msg, JsonRpcMessage::Response(_)));
853 }
854
855 let sent = String::from_utf8(output).unwrap();
857 assert!(sent.ends_with('\n'));
858 assert!(sent.contains("\"method\":\"tools/list\""));
859 assert!(sent.contains("\"jsonrpc\":\"2.0\""));
860 }
861
862 #[test]
863 fn e2e_ndjson_response_with_complex_result() {
864 let input = b"{\"jsonrpc\":\"2.0\",\"result\":{\"tools\":[{\"name\":\"tool1\",\"description\":\"A test tool\",\"inputSchema\":{\"type\":\"object\"}}]},\"id\":1}\n";
866
867 let reader = Cursor::new(input.to_vec());
868 let writer = Vec::new();
869 let mut transport = StdioTransport::new(reader, writer);
870 let cx = Cx::for_testing();
871
872 let msg = transport.recv(&cx).unwrap();
873 assert!(
874 matches!(msg, JsonRpcMessage::Response(_)),
875 "Expected response"
876 );
877 let JsonRpcMessage::Response(resp) = msg else {
878 return;
879 };
880 let result = resp.result.unwrap();
881 let tools = result.get("tools").unwrap().as_array().unwrap();
882 assert_eq!(tools.len(), 1);
883 assert_eq!(tools[0].get("name").unwrap(), "tool1");
884 }
885
886 #[test]
887 fn e2e_two_phase_send_multiple_messages() {
888 let reader = Cursor::new(Vec::new());
890 let mut output = Vec::new();
891
892 {
893 let mut transport = StdioTransport::new(reader, &mut output);
894 let cx = Cx::for_testing();
895
896 for i in 1..=5 {
898 let permit = transport.reserve_send(&cx).unwrap();
899 let request = JsonRpcRequest::new(format!("method_{i}"), None, i as i64);
900 permit.send_request(&request).unwrap();
901 }
902 }
903
904 let sent = String::from_utf8(output).unwrap();
906 let lines: Vec<&str> = sent.lines().collect();
907 assert_eq!(lines.len(), 5);
908
909 for (i, line) in lines.iter().enumerate() {
910 let expected_method = format!("method_{}", i + 1);
911 assert!(line.contains(&expected_method));
912 }
913 }
914
915 #[test]
916 fn e2e_transport_close_flushes() {
917 let reader = Cursor::new(Vec::new());
918 let mut output = Vec::new();
919
920 {
921 let mut transport = StdioTransport::new(reader, &mut output);
922 let cx = Cx::for_testing();
923
924 let request = JsonRpcRequest::new("test", None, 1i64);
926 transport.send_request_direct(&cx, &request).unwrap();
927
928 transport.close().unwrap();
930 }
931
932 let sent = String::from_utf8(output).unwrap();
934 assert!(!sent.is_empty());
935 assert!(sent.contains("\"method\":\"test\""));
936 }
937
938 #[test]
943 fn send_response_direct_writes_valid_ndjson() {
944 let reader = Cursor::new(Vec::new());
945 let mut output = Vec::new();
946
947 {
948 let mut transport = StdioTransport::new(reader, &mut output);
949 let cx = Cx::for_testing();
950
951 let response = JsonRpcResponse {
952 jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
953 result: Some(serde_json::json!({"ok": true})),
954 error: None,
955 id: Some(fastmcp_protocol::RequestId::Number(42)),
956 };
957 transport.send_response_direct(&cx, &response).unwrap();
958 }
959
960 let sent = String::from_utf8(output).unwrap();
961 assert!(sent.ends_with('\n'));
962 assert!(sent.contains("\"result\""));
963 assert!(sent.contains("\"ok\":true") || sent.contains("\"ok\": true"));
964 }
965
966 #[test]
967 fn send_response_direct_cancelled() {
968 let reader = Cursor::new(Vec::new());
969 let writer = Vec::new();
970 let mut transport = StdioTransport::new(reader, writer);
971
972 let cx = Cx::for_testing();
973 cx.set_cancel_requested(true);
974
975 let response = JsonRpcResponse {
976 jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
977 result: Some(serde_json::json!(null)),
978 error: None,
979 id: Some(fastmcp_protocol::RequestId::Number(1)),
980 };
981 let result = transport.send_response_direct(&cx, &response);
982 assert!(matches!(result, Err(TransportError::Cancelled)));
983 }
984
985 #[test]
986 fn transport_send_trait_method_with_response() {
987 let reader = Cursor::new(Vec::new());
988 let mut output = Vec::new();
989
990 {
991 let mut transport = StdioTransport::new(reader, &mut output);
992 let cx = Cx::for_testing();
993
994 let response = JsonRpcResponse {
995 jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
996 result: Some(serde_json::json!({"status": "done"})),
997 error: None,
998 id: Some(fastmcp_protocol::RequestId::Number(1)),
999 };
1000 transport
1001 .send(&cx, &JsonRpcMessage::Response(response))
1002 .unwrap();
1003 }
1004
1005 let sent = String::from_utf8(output).unwrap();
1006 assert!(sent.contains("\"status\""));
1007 }
1008
1009 #[test]
1010 fn transport_send_trait_cancelled() {
1011 let reader = Cursor::new(Vec::new());
1012 let writer = Vec::new();
1013 let mut transport = StdioTransport::new(reader, writer);
1014
1015 let cx = Cx::for_testing();
1016 cx.set_cancel_requested(true);
1017
1018 let request = JsonRpcRequest::new("test", None, 1i64);
1019 let result = transport.send(&cx, &JsonRpcMessage::Request(request));
1020 assert!(matches!(result, Err(TransportError::Cancelled)));
1021 }
1022
1023 #[test]
1024 fn two_phase_send_response_via_permit() {
1025 let reader = Cursor::new(Vec::new());
1026 let mut output = Vec::new();
1027
1028 {
1029 let mut transport = StdioTransport::new(reader, &mut output);
1030 let cx = Cx::for_testing();
1031
1032 let permit = transport.reserve_send(&cx).unwrap();
1033 let response = JsonRpcResponse {
1034 jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
1035 result: Some(serde_json::json!({"v": 1})),
1036 error: None,
1037 id: Some(fastmcp_protocol::RequestId::Number(1)),
1038 };
1039 permit.send_response(&response).unwrap();
1040 }
1041
1042 let sent = String::from_utf8(output).unwrap();
1043 assert!(sent.contains("\"result\""));
1044 }
1045
1046 #[test]
1047 fn recv_handles_crlf_line_endings() {
1048 let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"id\":1}\r\n";
1050 let reader = Cursor::new(input.to_vec());
1051 let writer = Vec::new();
1052 let mut transport = StdioTransport::new(reader, writer);
1053 let cx = Cx::for_testing();
1054
1055 let msg = transport.recv(&cx).unwrap();
1056 if let JsonRpcMessage::Request(req) = msg {
1057 assert_eq!(req.method, "test");
1058 } else {
1059 panic!("Expected request");
1060 }
1061 }
1062}