buildkit_frontend/
stdio.rs1use std::io::{self, stdin, stdout};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use pin_project::pin_project;
6use tokio::io::*;
7use tonic::transport::Uri;
8
9#[pin_project]
10pub struct StdioSocket {
11 #[pin]
12 reader: PollEvented<async_stdio::EventedStdin>,
13
14 #[pin]
15 writer: PollEvented<async_stdio::EventedStdout>,
16}
17
18pub async fn stdio_connector(_: Uri) -> io::Result<StdioSocket> {
19 StdioSocket::try_new()
20}
21
22impl StdioSocket {
23 pub fn try_new() -> io::Result<Self> {
24 Ok(StdioSocket {
25 reader: PollEvented::new(async_stdio::EventedStdin::try_new(stdin())?)?,
26 writer: PollEvented::new(async_stdio::EventedStdout::try_new(stdout())?)?,
27 })
28 }
29}
30
31impl AsyncRead for StdioSocket {
32 fn poll_read(
33 self: Pin<&mut Self>,
34 cx: &mut Context<'_>,
35 buf: &mut [u8],
36 ) -> Poll<Result<usize>> {
37 self.project().reader.poll_read(cx, buf)
38 }
39}
40
41impl AsyncWrite for StdioSocket {
42 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
43 self.project().writer.poll_write(cx, buf)
44 }
45
46 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
47 self.project().writer.poll_flush(cx)
48 }
49
50 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
51 self.project().writer.poll_shutdown(cx)
52 }
53}
54
55mod async_stdio {
56 use std::io::{self, Read, Stdin, Stdout, Write};
57 use std::os::unix::io::AsRawFd;
58
59 use mio::event::Evented;
60 use mio::unix::EventedFd;
61 use mio::{Poll, PollOpt, Ready, Token};
62
63 use libc::{fcntl, F_GETFL, F_SETFL, O_NONBLOCK};
64
65 pub struct EventedStdin(Stdin);
66 pub struct EventedStdout(Stdout);
67
68 impl EventedStdin {
69 pub fn try_new(stdin: Stdin) -> io::Result<Self> {
70 set_non_blocking_flag(&stdin)?;
71
72 Ok(EventedStdin(stdin))
73 }
74 }
75
76 impl EventedStdout {
77 pub fn try_new(stdout: Stdout) -> io::Result<Self> {
78 set_non_blocking_flag(&stdout)?;
79
80 Ok(EventedStdout(stdout))
81 }
82 }
83
84 impl Evented for EventedStdin {
85 fn register(
86 &self,
87 poll: &Poll,
88 token: Token,
89 interest: Ready,
90 opts: PollOpt,
91 ) -> io::Result<()> {
92 EventedFd(&self.0.as_raw_fd()).register(poll, token, interest, opts)
93 }
94
95 fn reregister(
96 &self,
97 poll: &Poll,
98 token: Token,
99 interest: Ready,
100 opts: PollOpt,
101 ) -> io::Result<()> {
102 EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest, opts)
103 }
104
105 fn deregister(&self, poll: &Poll) -> io::Result<()> {
106 EventedFd(&self.0.as_raw_fd()).deregister(poll)
107 }
108 }
109
110 impl Read for EventedStdin {
111 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
112 self.0.read(buf)
113 }
114 }
115
116 impl Evented for EventedStdout {
117 fn register(
118 &self,
119 poll: &Poll,
120 token: Token,
121 interest: Ready,
122 opts: PollOpt,
123 ) -> io::Result<()> {
124 EventedFd(&self.0.as_raw_fd()).register(poll, token, interest, opts)
125 }
126
127 fn reregister(
128 &self,
129 poll: &Poll,
130 token: Token,
131 interest: Ready,
132 opts: PollOpt,
133 ) -> io::Result<()> {
134 EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest, opts)
135 }
136
137 fn deregister(&self, poll: &Poll) -> io::Result<()> {
138 EventedFd(&self.0.as_raw_fd()).deregister(poll)
139 }
140 }
141
142 impl Write for EventedStdout {
143 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
144 self.0.write(buf)
145 }
146
147 fn flush(&mut self) -> io::Result<()> {
148 self.0.flush()
149 }
150 }
151
152 fn set_non_blocking_flag<T: AsRawFd>(stream: &T) -> io::Result<()> {
153 let flags = unsafe { fcntl(stream.as_raw_fd(), F_GETFL, 0) };
154
155 if flags < 0 {
156 return Err(std::io::Error::last_os_error());
157 }
158
159 if unsafe { fcntl(stream.as_raw_fd(), F_SETFL, flags | O_NONBLOCK) } != 0 {
160 return Err(std::io::Error::last_os_error());
161 }
162
163 Ok(())
164 }
165}