1use crate::mcp::Bytes;
2use crate::transport::{Action, Result, Transport};
3
4use futures::channel::mpsc;
5use futures::channel::oneshot;
6use futures::{SinkExt, StreamExt};
7use tokio::io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
8use tokio::task;
9
10pub struct Stdio {
11 input: BufReader<Box<dyn AsyncRead + Send + Unpin>>,
12 output: mpsc::Sender<Result>,
13}
14
15impl Stdio {
16 pub fn current() -> Self {
17 Stdio::custom(io::stdin(), io::stdout())
18 }
19
20 pub fn custom(
21 input: impl AsyncRead + Send + Unpin + 'static,
22 mut output: impl AsyncWrite + Send + Unpin + 'static,
23 ) -> Self {
24 let (sender, mut receiver) = mpsc::channel(10);
25
26 drop(task::spawn(async move {
27 while let Some(action) = receiver.next().await {
28 match action {
29 Result::Send(bytes) => write(&bytes, &mut output).await?,
30 Result::Stream(mut stream) => {
31 while let Some(bytes) = stream.next().await {
32 write(&bytes, &mut output).await?
33 }
34 }
35 Result::Accept | Result::Reject | Result::Unsupported => {}
36 }
37 }
38
39 Ok::<(), io::Error>(())
40 }));
41
42 Self {
43 input: BufReader::new(Box::new(input)),
44 output: sender,
45 }
46 }
47}
48
49impl Transport for Stdio {
50 async fn accept(&mut self) -> io::Result<Action> {
51 let mut line = Vec::new();
52
53 if self.input.read_until(0xA, &mut line).await? == 0 {
54 return Ok(Action::Quit);
55 }
56
57 let mut output = self.output.clone();
58 let (sender, receiver) = oneshot::channel();
59
60 task::spawn(async move {
61 if let Ok(result) = receiver.await {
62 let _ = output.send(result).await;
63 }
64 });
65
66 Ok(Action::Handle(Bytes::from_owner(line), sender))
67 }
68}
69
70async fn write(data: &[u8], writer: &mut (dyn AsyncWrite + Send + Unpin)) -> io::Result<()> {
71 writer.write_all(data).await?;
72 writer.write_u8(0xA).await?;
73 writer.flush().await
74}