1use std::io::{BufRead, BufReader, Read, Write};
42
43use asupersync::{Budget, Cx};
44use fastmcp_protocol::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse};
45
46use crate::async_io::{AsyncLineReader, AsyncStdout};
47use crate::{Codec, CodecError, SendPermit, Transport, TransportError, TwoPhaseTransport};
48
49pub struct StdioTransport<R, W> {
62 reader: BufReader<R>,
63 writer: W,
64 codec: Codec,
65 line_buffer: String,
66}
67
68impl<R: Read, W: Write> StdioTransport<R, W> {
69 #[must_use]
73 pub fn new(reader: R, writer: W) -> Self {
74 Self {
75 reader: BufReader::new(reader),
76 writer,
77 codec: Codec::new(),
78 line_buffer: String::with_capacity(4096),
79 }
80 }
81
82 fn write_message(&mut self, message: &JsonRpcMessage) -> Result<(), TransportError> {
84 let bytes = match message {
85 JsonRpcMessage::Request(req) => self.codec.encode_request(req)?,
86 JsonRpcMessage::Response(resp) => self.codec.encode_response(resp)?,
87 };
88 self.writer.write_all(&bytes)?;
89 self.writer.flush()?;
90 Ok(())
91 }
92
93 fn read_line(&mut self) -> Result<&str, TransportError> {
95 self.line_buffer.clear();
96 let bytes_read = self.reader.read_line(&mut self.line_buffer)?;
97
98 if bytes_read == 0 {
99 return Err(TransportError::Closed);
100 }
101
102 let line_len = {
104 let line = self
105 .line_buffer
106 .trim_end_matches('\n')
107 .trim_end_matches('\r');
108 line.len()
109 };
110 if line_len > self.codec.max_message_size() {
111 self.line_buffer.clear();
112 return Err(TransportError::Codec(CodecError::MessageTooLarge(line_len)));
113 }
114 let line = self
115 .line_buffer
116 .trim_end_matches('\n')
117 .trim_end_matches('\r');
118 Ok(line)
119 }
120}
121
122impl StdioTransport<std::io::Stdin, std::io::Stdout> {
123 #[must_use]
127 pub fn stdio() -> Self {
128 Self::new(std::io::stdin(), std::io::stdout())
129 }
130}
131
132impl<R: Read, W: Write> Transport for StdioTransport<R, W> {
133 fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
134 if cx.is_cancel_requested() {
136 return Err(TransportError::Cancelled);
137 }
138
139 self.write_message(message)
140 }
141
142 fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
143 if cx.is_cancel_requested() {
145 return Err(TransportError::Cancelled);
146 }
147
148 loop {
150 let line = self.read_line()?;
151
152 if line.is_empty() {
154 if cx.is_cancel_requested() {
156 return Err(TransportError::Cancelled);
157 }
158 continue;
159 }
160
161 let message: JsonRpcMessage = serde_json::from_str(line)
163 .map_err(|e| TransportError::Codec(crate::CodecError::Json(e)))?;
164
165 return Ok(message);
166 }
167 }
168
169 fn close(&mut self) -> Result<(), TransportError> {
170 self.writer.flush()?;
171 Ok(())
172 }
173}
174
175impl<R: Read, W: Write> StdioTransport<R, W> {
177 pub fn send_request_direct(
179 &mut self,
180 cx: &Cx,
181 request: &JsonRpcRequest,
182 ) -> Result<(), TransportError> {
183 if cx.is_cancel_requested() {
184 return Err(TransportError::Cancelled);
185 }
186 let bytes = self.codec.encode_request(request)?;
187 self.writer.write_all(&bytes)?;
188 self.writer.flush()?;
189 Ok(())
190 }
191
192 pub fn send_response_direct(
194 &mut self,
195 cx: &Cx,
196 response: &JsonRpcResponse,
197 ) -> Result<(), TransportError> {
198 if cx.is_cancel_requested() {
199 return Err(TransportError::Cancelled);
200 }
201 let bytes = self.codec.encode_response(response)?;
202 self.writer.write_all(&bytes)?;
203 self.writer.flush()?;
204 Ok(())
205 }
206}
207
208impl<R: Read, W: Write> TwoPhaseTransport for StdioTransport<R, W> {
209 type Writer = W;
210
211 fn reserve_send(&mut self, cx: &Cx) -> Result<SendPermit<'_, Self::Writer>, TransportError> {
212 if cx.is_cancel_requested() {
214 return Err(TransportError::Cancelled);
215 }
216
217 Ok(SendPermit::new(&mut self.writer, &self.codec))
219 }
220}
221
222pub struct AsyncStdioTransport {
261 reader: AsyncLineReader,
262 writer: AsyncStdout,
263 codec: Codec,
264}
265
266impl AsyncStdioTransport {
267 #[must_use]
272 pub fn new() -> Self {
273 Self {
274 reader: AsyncLineReader::new(),
275 writer: AsyncStdout::new(),
276 codec: Codec::new(),
277 }
278 }
279}
280
281impl Default for AsyncStdioTransport {
282 fn default() -> Self {
283 Self::new()
284 }
285}
286
287impl Transport for AsyncStdioTransport {
288 fn send(&mut self, cx: &Cx, message: &JsonRpcMessage) -> Result<(), TransportError> {
289 if cx.is_cancel_requested() {
291 return Err(TransportError::Cancelled);
292 }
293
294 let bytes = match message {
295 JsonRpcMessage::Request(req) => self.codec.encode_request(req)?,
296 JsonRpcMessage::Response(resp) => self.codec.encode_response(resp)?,
297 };
298
299 self.writer.write_all_sync(cx, &bytes).map_err(|e| {
301 if e.kind() == std::io::ErrorKind::Interrupted {
302 TransportError::Cancelled
303 } else {
304 TransportError::Io(e)
305 }
306 })?;
307
308 self.writer.flush_sync(cx).map_err(|e| {
309 if e.kind() == std::io::ErrorKind::Interrupted {
310 TransportError::Cancelled
311 } else {
312 TransportError::Io(e)
313 }
314 })?;
315
316 Ok(())
317 }
318
319 fn recv(&mut self, cx: &Cx) -> Result<JsonRpcMessage, TransportError> {
320 if cx.is_cancel_requested() {
322 return Err(TransportError::Cancelled);
323 }
324
325 let line = self
327 .reader
328 .read_non_empty_line(cx)
329 .map_err(|e| {
330 if e.kind() == std::io::ErrorKind::Interrupted {
331 TransportError::Cancelled
332 } else {
333 TransportError::Io(e)
334 }
335 })?
336 .ok_or(TransportError::Closed)?;
337
338 if line.len() > self.codec.max_message_size() {
339 return Err(TransportError::Codec(CodecError::MessageTooLarge(
340 line.len(),
341 )));
342 }
343
344 let message: JsonRpcMessage = serde_json::from_str(&line)
346 .map_err(|e| TransportError::Codec(crate::CodecError::Json(e)))?;
347
348 Ok(message)
349 }
350
351 fn close(&mut self) -> Result<(), TransportError> {
352 let cx = Cx::for_request_with_budget(Budget::INFINITE);
355 self.writer.flush_sync(&cx)?;
356 Ok(())
357 }
358}
359
360impl AsyncStdioTransport {
361 pub fn send_request_direct(
363 &mut self,
364 cx: &Cx,
365 request: &JsonRpcRequest,
366 ) -> Result<(), TransportError> {
367 if cx.is_cancel_requested() {
368 return Err(TransportError::Cancelled);
369 }
370
371 let bytes = self.codec.encode_request(request)?;
372
373 self.writer.write_all_sync(cx, &bytes).map_err(|e| {
374 if e.kind() == std::io::ErrorKind::Interrupted {
375 TransportError::Cancelled
376 } else {
377 TransportError::Io(e)
378 }
379 })?;
380
381 self.writer.flush_sync(cx).map_err(|e| {
382 if e.kind() == std::io::ErrorKind::Interrupted {
383 TransportError::Cancelled
384 } else {
385 TransportError::Io(e)
386 }
387 })
388 }
389
390 pub fn send_response_direct(
392 &mut self,
393 cx: &Cx,
394 response: &JsonRpcResponse,
395 ) -> Result<(), TransportError> {
396 if cx.is_cancel_requested() {
397 return Err(TransportError::Cancelled);
398 }
399
400 let bytes = self.codec.encode_response(response)?;
401
402 self.writer.write_all_sync(cx, &bytes).map_err(|e| {
403 if e.kind() == std::io::ErrorKind::Interrupted {
404 TransportError::Cancelled
405 } else {
406 TransportError::Io(e)
407 }
408 })?;
409
410 self.writer.flush_sync(cx).map_err(|e| {
411 if e.kind() == std::io::ErrorKind::Interrupted {
412 TransportError::Cancelled
413 } else {
414 TransportError::Io(e)
415 }
416 })
417 }
418}
419
420impl TwoPhaseTransport for AsyncStdioTransport {
421 type Writer = AsyncStdout;
422
423 fn reserve_send(&mut self, cx: &Cx) -> Result<SendPermit<'_, Self::Writer>, TransportError> {
424 if cx.is_cancel_requested() {
426 return Err(TransportError::Cancelled);
427 }
428
429 Ok(SendPermit::new(&mut self.writer, &self.codec))
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use std::io::Cursor;
439
440 #[test]
441 fn test_send_receive_roundtrip() {
442 let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"id\":1}\n";
444 let reader = Cursor::new(input.to_vec());
445 let writer = Vec::new();
446
447 let mut transport = StdioTransport::new(reader, writer);
448
449 let cx = Cx::for_testing();
451 let msg = transport.recv(&cx).unwrap();
452 assert!(matches!(&msg, JsonRpcMessage::Request(_)));
453 if let JsonRpcMessage::Request(req) = msg {
454 assert_eq!(req.method, "test");
455 }
456 }
457
458 #[test]
459 fn test_send_message() {
460 let reader = Cursor::new(Vec::new());
461 let writer = Vec::new();
462
463 let mut transport = StdioTransport::new(reader, writer);
464
465 let cx = Cx::for_testing();
466 let request = JsonRpcRequest::new("test/method", None, 1i64);
467 transport.send_request_direct(&cx, &request).unwrap();
468 }
469
470 #[test]
471 fn test_eof_returns_closed() {
472 let reader = Cursor::new(Vec::new());
474 let writer = Vec::new();
475
476 let mut transport = StdioTransport::new(reader, writer);
477
478 let cx = Cx::for_testing();
479 let result = transport.recv(&cx);
480 assert!(matches!(result, Err(TransportError::Closed)));
481 }
482
483 #[test]
484 fn test_skip_empty_lines() {
485 let input = b"\n\n{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"id\":1}\n";
487 let reader = Cursor::new(input.to_vec());
488 let writer = Vec::new();
489
490 let mut transport = StdioTransport::new(reader, writer);
491
492 let cx = Cx::for_testing();
493 let msg = transport.recv(&cx).unwrap();
494 assert!(matches!(&msg, JsonRpcMessage::Request(_)));
495 if let JsonRpcMessage::Request(req) = msg {
496 assert_eq!(req.method, "test");
497 }
498 }
499
500 #[test]
501 fn test_recv_rejects_oversized_line() {
502 let request = JsonRpcRequest::new("test/method", None, 1i64);
503 let line = serde_json::to_vec(&request).unwrap();
504 let mut input = line.clone();
505 input.push(b'\n');
506 let reader = Cursor::new(input);
507 let writer = Vec::new();
508
509 let mut transport = StdioTransport::new(reader, writer);
510 transport
511 .codec
512 .set_max_message_size(line.len().saturating_sub(1));
513
514 let cx = Cx::for_testing();
515 let result = transport.recv(&cx);
516 assert!(matches!(
517 result,
518 Err(TransportError::Codec(CodecError::MessageTooLarge(_)))
519 ));
520 }
521
522 #[test]
523 fn test_cancellation_on_recv() {
524 let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"id\":1}\n";
525 let reader = Cursor::new(input.to_vec());
526 let writer = Vec::new();
527
528 let mut transport = StdioTransport::new(reader, writer);
529
530 let cx = Cx::for_testing();
531 cx.set_cancel_requested(true);
532
533 let result = transport.recv(&cx);
534 assert!(matches!(result, Err(TransportError::Cancelled)));
535 }
536
537 #[test]
538 fn test_cancellation_on_send() {
539 let reader = Cursor::new(Vec::new());
540 let writer = Vec::new();
541
542 let mut transport = StdioTransport::new(reader, writer);
543
544 let cx = Cx::for_testing();
545 cx.set_cancel_requested(true);
546
547 let request = JsonRpcRequest::new("test/method", None, 1i64);
548 let result = transport.send_request_direct(&cx, &request);
549 assert!(matches!(result, Err(TransportError::Cancelled)));
550 }
551
552 #[test]
553 fn test_two_phase_send_success() {
554 let reader = Cursor::new(Vec::new());
555 let writer = Vec::new();
556
557 let mut transport = StdioTransport::new(reader, writer);
558
559 let cx = Cx::for_testing();
560
561 let permit = transport.reserve_send(&cx).unwrap();
563
564 let request = JsonRpcRequest::new("test/method", None, 1i64);
566 permit.send_request(&request).unwrap();
567 }
568
569 #[test]
570 fn test_two_phase_send_cancellation_on_reserve() {
571 let reader = Cursor::new(Vec::new());
572 let writer = Vec::new();
573
574 let mut transport = StdioTransport::new(reader, writer);
575
576 let cx = Cx::for_testing();
577 cx.set_cancel_requested(true);
578
579 let result = transport.reserve_send(&cx);
581 assert!(matches!(result, Err(TransportError::Cancelled)));
582 }
583
584 #[test]
585 fn test_two_phase_send_message() {
586 let reader = Cursor::new(Vec::new());
587 let writer = Vec::new();
588
589 let mut transport = StdioTransport::new(reader, writer);
590
591 let cx = Cx::for_testing();
592
593 let permit = transport.reserve_send(&cx).unwrap();
595 let request = JsonRpcRequest::new("test/method", None, 1i64);
596 let message = JsonRpcMessage::Request(request);
597 permit.send(&message).unwrap();
598 }
599
600 #[test]
605 fn e2e_ndjson_multiple_messages_in_sequence() {
606 let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"init\",\"id\":1}\n\
608 {\"jsonrpc\":\"2.0\",\"method\":\"tools/list\",\"id\":2}\n\
609 {\"jsonrpc\":\"2.0\",\"method\":\"tools/call\",\"params\":{\"name\":\"test\"},\"id\":3}\n";
610
611 let reader = Cursor::new(input.to_vec());
612 let writer = Vec::new();
613 let mut transport = StdioTransport::new(reader, writer);
614 let cx = Cx::for_testing();
615
616 let msg1 = transport.recv(&cx).unwrap();
618 match msg1 {
619 JsonRpcMessage::Request(req) => assert_eq!(req.method, "init"),
620 _ => panic!("Expected request"),
621 }
622
623 let msg2 = transport.recv(&cx).unwrap();
625 match msg2 {
626 JsonRpcMessage::Request(req) => assert_eq!(req.method, "tools/list"),
627 _ => panic!("Expected request"),
628 }
629
630 let msg3 = transport.recv(&cx).unwrap();
632 match msg3 {
633 JsonRpcMessage::Request(req) => {
634 assert_eq!(req.method, "tools/call");
635 assert!(req.params.is_some());
636 }
637 _ => panic!("Expected request"),
638 }
639
640 let result = transport.recv(&cx);
642 assert!(matches!(result, Err(TransportError::Closed)));
643 }
644
645 #[test]
646 fn e2e_ndjson_request_response_flow() {
647 let input = b"{\"jsonrpc\":\"2.0\",\"result\":{\"success\":true},\"id\":1}\n";
649
650 let reader = Cursor::new(input.to_vec());
651 let mut output = Vec::new();
652 let mut transport = StdioTransport::new(reader, Cursor::new(&mut output));
653 let cx = Cx::for_testing();
654
655 let request = JsonRpcRequest::new(
657 "test/method",
658 Some(serde_json::json!({"key": "value"})),
659 1i64,
660 );
661 transport.send_request_direct(&cx, &request).unwrap();
662
663 let msg = transport.recv(&cx).unwrap();
665 match msg {
666 JsonRpcMessage::Response(resp) => {
667 assert!(resp.result.is_some());
668 assert!(resp.error.is_none());
669 }
670 _ => panic!("Expected response"),
671 }
672 }
673
674 #[test]
675 fn e2e_ndjson_handles_mixed_empty_lines() {
676 let input = b"\n\n{\"jsonrpc\":\"2.0\",\"method\":\"test1\",\"id\":1}\n\n\n{\"jsonrpc\":\"2.0\",\"method\":\"test2\",\"id\":2}\n\n";
678
679 let reader = Cursor::new(input.to_vec());
680 let writer = Vec::new();
681 let mut transport = StdioTransport::new(reader, writer);
682 let cx = Cx::for_testing();
683
684 let msg1 = transport.recv(&cx).unwrap();
686 match msg1 {
687 JsonRpcMessage::Request(req) => assert_eq!(req.method, "test1"),
688 _ => panic!("Expected request"),
689 }
690
691 let msg2 = transport.recv(&cx).unwrap();
692 match msg2 {
693 JsonRpcMessage::Request(req) => assert_eq!(req.method, "test2"),
694 _ => panic!("Expected request"),
695 }
696 }
697
698 #[test]
699 fn e2e_ndjson_handles_unicode_content() {
700 let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"params\":{\"message\":\"\xC3\xA9\xC3\xA8\xC3\xAA\xE4\xB8\xAD\xE6\x96\x87\xF0\x9F\x91\x8B\"},\"id\":1}\n";
702
703 let reader = Cursor::new(input.to_vec());
704 let writer = Vec::new();
705 let mut transport = StdioTransport::new(reader, writer);
706 let cx = Cx::for_testing();
707
708 let msg = transport.recv(&cx).unwrap();
709 match msg {
710 JsonRpcMessage::Request(req) => {
711 assert_eq!(req.method, "test");
712 let params = req.params.as_ref().unwrap();
713 let message = params.get("message").unwrap().as_str().unwrap();
714 assert!(message.contains("é"));
716 assert!(message.contains("中"));
717 assert!(message.contains("👋"));
718 }
719 _ => panic!("Expected request"),
720 }
721 }
722
723 #[test]
724 fn e2e_ndjson_large_message() {
725 let large_data = "x".repeat(100_000);
727 let message = format!(
728 "{{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"params\":{{\"data\":\"{}\"}},\"id\":1}}\n",
729 large_data
730 );
731
732 let reader = Cursor::new(message.into_bytes());
733 let writer = Vec::new();
734 let mut transport = StdioTransport::new(reader, writer);
735 let cx = Cx::for_testing();
736
737 let msg = transport.recv(&cx).unwrap();
738 match msg {
739 JsonRpcMessage::Request(req) => {
740 assert_eq!(req.method, "test");
741 let params = req.params.as_ref().unwrap();
742 let data = params.get("data").unwrap().as_str().unwrap();
743 assert_eq!(data.len(), 100_000);
744 }
745 _ => panic!("Expected request"),
746 }
747 }
748
749 #[test]
750 fn e2e_ndjson_notification() {
751 let input = b"{\"jsonrpc\":\"2.0\",\"method\":\"notifications/initialized\"}\n";
753
754 let reader = Cursor::new(input.to_vec());
755 let writer = Vec::new();
756 let mut transport = StdioTransport::new(reader, writer);
757 let cx = Cx::for_testing();
758
759 let msg = transport.recv(&cx).unwrap();
760 match msg {
761 JsonRpcMessage::Request(req) => {
762 assert_eq!(req.method, "notifications/initialized");
763 assert!(req.id.is_none());
764 }
765 _ => panic!("Expected request/notification"),
766 }
767 }
768
769 #[test]
770 fn e2e_ndjson_error_response() {
771 let input = b"{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32601,\"message\":\"Method not found\"},\"id\":1}\n";
773
774 let reader = Cursor::new(input.to_vec());
775 let writer = Vec::new();
776 let mut transport = StdioTransport::new(reader, writer);
777 let cx = Cx::for_testing();
778
779 let msg = transport.recv(&cx).unwrap();
780 match msg {
781 JsonRpcMessage::Response(resp) => {
782 assert!(resp.result.is_none());
783 assert!(resp.error.is_some());
784 let error = resp.error.unwrap();
785 assert_eq!(error.code, -32601);
786 assert_eq!(error.message, "Method not found");
787 }
788 _ => panic!("Expected response"),
789 }
790 }
791
792 #[test]
793 fn e2e_ndjson_malformed_json_error() {
794 let input = b"{invalid json\n";
796
797 let reader = Cursor::new(input.to_vec());
798 let writer = Vec::new();
799 let mut transport = StdioTransport::new(reader, writer);
800 let cx = Cx::for_testing();
801
802 let result = transport.recv(&cx);
803 assert!(matches!(result, Err(TransportError::Codec(_))));
804 }
805
806 #[test]
807 fn e2e_ndjson_bidirectional_flow() {
808 let input = b"{\"jsonrpc\":\"2.0\",\"result\":{\"tools\":[]},\"id\":1}\n";
810 let reader = Cursor::new(input.to_vec());
811 let mut output = Vec::new();
812
813 {
815 let mut transport = StdioTransport::new(reader, &mut output);
816 let cx = Cx::for_testing();
817
818 let request = JsonRpcRequest::new("tools/list", None, 1i64);
820 transport.send_request_direct(&cx, &request).unwrap();
821
822 let msg = transport.recv(&cx).unwrap();
824 assert!(matches!(msg, JsonRpcMessage::Response(_)));
825 }
826
827 let sent = String::from_utf8(output).unwrap();
829 assert!(sent.ends_with('\n'));
830 assert!(sent.contains("\"method\":\"tools/list\""));
831 assert!(sent.contains("\"jsonrpc\":\"2.0\""));
832 }
833
834 #[test]
835 fn e2e_ndjson_response_with_complex_result() {
836 let input = b"{\"jsonrpc\":\"2.0\",\"result\":{\"tools\":[{\"name\":\"tool1\",\"description\":\"A test tool\",\"inputSchema\":{\"type\":\"object\"}}]},\"id\":1}\n";
838
839 let reader = Cursor::new(input.to_vec());
840 let writer = Vec::new();
841 let mut transport = StdioTransport::new(reader, writer);
842 let cx = Cx::for_testing();
843
844 let msg = transport.recv(&cx).unwrap();
845 match msg {
846 JsonRpcMessage::Response(resp) => {
847 let result = resp.result.unwrap();
848 let tools = result.get("tools").unwrap().as_array().unwrap();
849 assert_eq!(tools.len(), 1);
850 assert_eq!(tools[0].get("name").unwrap(), "tool1");
851 }
852 _ => panic!("Expected response"),
853 }
854 }
855
856 #[test]
857 fn e2e_two_phase_send_multiple_messages() {
858 let reader = Cursor::new(Vec::new());
860 let mut output = Vec::new();
861
862 {
863 let mut transport = StdioTransport::new(reader, &mut output);
864 let cx = Cx::for_testing();
865
866 for i in 1..=5 {
868 let permit = transport.reserve_send(&cx).unwrap();
869 let request = JsonRpcRequest::new(format!("method_{i}"), None, i as i64);
870 permit.send_request(&request).unwrap();
871 }
872 }
873
874 let sent = String::from_utf8(output).unwrap();
876 let lines: Vec<&str> = sent.lines().collect();
877 assert_eq!(lines.len(), 5);
878
879 for (i, line) in lines.iter().enumerate() {
880 let expected_method = format!("method_{}", i + 1);
881 assert!(line.contains(&expected_method));
882 }
883 }
884
885 #[test]
886 fn e2e_transport_close_flushes() {
887 let reader = Cursor::new(Vec::new());
888 let mut output = Vec::new();
889
890 {
891 let mut transport = StdioTransport::new(reader, &mut output);
892 let cx = Cx::for_testing();
893
894 let request = JsonRpcRequest::new("test", None, 1i64);
896 transport.send_request_direct(&cx, &request).unwrap();
897
898 transport.close().unwrap();
900 }
901
902 let sent = String::from_utf8(output).unwrap();
904 assert!(!sent.is_empty());
905 assert!(sent.contains("\"method\":\"test\""));
906 }
907}