techne_server/
stdio.rs

1use crate::mcp::Bytes;
2use crate::transport::{Action, Result, Transport};
3
4use futures::channel::mpsc;
5use futures::channel::oneshot;
6use futures::{SinkExt, StreamExt};
7use tokio::io::{self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
8use tokio::task;
9
10pub struct Stdio {
11    input: BufReader<Box<dyn AsyncRead + Send + Unpin>>,
12    output: mpsc::Sender<Result>,
13}
14
15impl Stdio {
16    pub fn current() -> Self {
17        Stdio::custom(io::stdin(), io::stdout())
18    }
19
20    pub fn custom(
21        input: impl AsyncRead + Send + Unpin + 'static,
22        mut output: impl AsyncWrite + Send + Unpin + 'static,
23    ) -> Self {
24        let (sender, mut receiver) = mpsc::channel(10);
25
26        drop(task::spawn(async move {
27            while let Some(action) = receiver.next().await {
28                match action {
29                    Result::Send(bytes) => write(&bytes, &mut output).await?,
30                    Result::Stream(mut stream) => {
31                        while let Some(bytes) = stream.next().await {
32                            write(&bytes, &mut output).await?
33                        }
34                    }
35                    Result::Accept | Result::Reject | Result::Unsupported => {}
36                }
37            }
38
39            Ok::<(), io::Error>(())
40        }));
41
42        Self {
43            input: BufReader::new(Box::new(input)),
44            output: sender,
45        }
46    }
47}
48
49impl Transport for Stdio {
50    async fn accept(&mut self) -> io::Result<Action> {
51        let mut line = Vec::new();
52
53        if self.input.read_until(0xA, &mut line).await? == 0 {
54            return Ok(Action::Quit);
55        }
56
57        let mut output = self.output.clone();
58        let (sender, receiver) = oneshot::channel();
59
60        task::spawn(async move {
61            if let Ok(result) = receiver.await {
62                let _ = output.send(result).await;
63            }
64        });
65
66        Ok(Action::Handle(Bytes::from_owner(line), sender))
67    }
68}
69
70async fn write(data: &[u8], writer: &mut (dyn AsyncWrite + Send + Unpin)) -> io::Result<()> {
71    writer.write_all(data).await?;
72    writer.write_u8(0xA).await?;
73    writer.flush().await
74}