buildkit_frontend/
stdio.rs

1use std::io::{self, stdin, stdout};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use pin_project::pin_project;
6use tokio::io::*;
7use tonic::transport::Uri;
8
9#[pin_project]
10pub struct StdioSocket {
11    #[pin]
12    reader: PollEvented<async_stdio::EventedStdin>,
13
14    #[pin]
15    writer: PollEvented<async_stdio::EventedStdout>,
16}
17
18pub async fn stdio_connector(_: Uri) -> io::Result<StdioSocket> {
19    StdioSocket::try_new()
20}
21
22impl StdioSocket {
23    pub fn try_new() -> io::Result<Self> {
24        Ok(StdioSocket {
25            reader: PollEvented::new(async_stdio::EventedStdin::try_new(stdin())?)?,
26            writer: PollEvented::new(async_stdio::EventedStdout::try_new(stdout())?)?,
27        })
28    }
29}
30
31impl AsyncRead for StdioSocket {
32    fn poll_read(
33        self: Pin<&mut Self>,
34        cx: &mut Context<'_>,
35        buf: &mut [u8],
36    ) -> Poll<Result<usize>> {
37        self.project().reader.poll_read(cx, buf)
38    }
39}
40
41impl AsyncWrite for StdioSocket {
42    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
43        self.project().writer.poll_write(cx, buf)
44    }
45
46    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
47        self.project().writer.poll_flush(cx)
48    }
49
50    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
51        self.project().writer.poll_shutdown(cx)
52    }
53}
54
55mod async_stdio {
56    use std::io::{self, Read, Stdin, Stdout, Write};
57    use std::os::unix::io::AsRawFd;
58
59    use mio::event::Evented;
60    use mio::unix::EventedFd;
61    use mio::{Poll, PollOpt, Ready, Token};
62
63    use libc::{fcntl, F_GETFL, F_SETFL, O_NONBLOCK};
64
65    pub struct EventedStdin(Stdin);
66    pub struct EventedStdout(Stdout);
67
68    impl EventedStdin {
69        pub fn try_new(stdin: Stdin) -> io::Result<Self> {
70            set_non_blocking_flag(&stdin)?;
71
72            Ok(EventedStdin(stdin))
73        }
74    }
75
76    impl EventedStdout {
77        pub fn try_new(stdout: Stdout) -> io::Result<Self> {
78            set_non_blocking_flag(&stdout)?;
79
80            Ok(EventedStdout(stdout))
81        }
82    }
83
84    impl Evented for EventedStdin {
85        fn register(
86            &self,
87            poll: &Poll,
88            token: Token,
89            interest: Ready,
90            opts: PollOpt,
91        ) -> io::Result<()> {
92            EventedFd(&self.0.as_raw_fd()).register(poll, token, interest, opts)
93        }
94
95        fn reregister(
96            &self,
97            poll: &Poll,
98            token: Token,
99            interest: Ready,
100            opts: PollOpt,
101        ) -> io::Result<()> {
102            EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest, opts)
103        }
104
105        fn deregister(&self, poll: &Poll) -> io::Result<()> {
106            EventedFd(&self.0.as_raw_fd()).deregister(poll)
107        }
108    }
109
110    impl Read for EventedStdin {
111        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
112            self.0.read(buf)
113        }
114    }
115
116    impl Evented for EventedStdout {
117        fn register(
118            &self,
119            poll: &Poll,
120            token: Token,
121            interest: Ready,
122            opts: PollOpt,
123        ) -> io::Result<()> {
124            EventedFd(&self.0.as_raw_fd()).register(poll, token, interest, opts)
125        }
126
127        fn reregister(
128            &self,
129            poll: &Poll,
130            token: Token,
131            interest: Ready,
132            opts: PollOpt,
133        ) -> io::Result<()> {
134            EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest, opts)
135        }
136
137        fn deregister(&self, poll: &Poll) -> io::Result<()> {
138            EventedFd(&self.0.as_raw_fd()).deregister(poll)
139        }
140    }
141
142    impl Write for EventedStdout {
143        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
144            self.0.write(buf)
145        }
146
147        fn flush(&mut self) -> io::Result<()> {
148            self.0.flush()
149        }
150    }
151
152    fn set_non_blocking_flag<T: AsRawFd>(stream: &T) -> io::Result<()> {
153        let flags = unsafe { fcntl(stream.as_raw_fd(), F_GETFL, 0) };
154
155        if flags < 0 {
156            return Err(std::io::Error::last_os_error());
157        }
158
159        if unsafe { fcntl(stream.as_raw_fd(), F_SETFL, flags | O_NONBLOCK) } != 0 {
160            return Err(std::io::Error::last_os_error());
161        }
162
163        Ok(())
164    }
165}