1use std::{
2 io::{self, IsTerminal, Read, Write},
3 os::windows::io::{AsRawHandle, BorrowedHandle, RawHandle},
4 pin::Pin,
5 sync::OnceLock,
6 task::Poll,
7};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
10use compio_driver::{
11 AsFd, AsRawFd, BorrowedFd, OpCode, OpType, RawFd, SharedFd,
12 op::{BufResultExt, Recv, RecvManaged, ResultTakeBuffer, Send},
13};
14use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
15use compio_runtime::{BorrowedBuffer, BufferPool, Runtime};
16use windows_sys::Win32::System::IO::OVERLAPPED;
17
18#[cfg(doc)]
19use super::{stderr, stdin, stdout};
20
21struct StdRead<R: Read, B: IoBufMut> {
22 reader: R,
23 buffer: B,
24}
25
26impl<R: Read, B: IoBufMut> StdRead<R, B> {
27 pub fn new(reader: R, buffer: B) -> Self {
28 Self { reader, buffer }
29 }
30}
31
32impl<R: Read, B: IoBufMut> OpCode for StdRead<R, B> {
33 fn op_type(&self) -> OpType {
34 OpType::Blocking
35 }
36
37 unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
38 let this = unsafe { self.get_unchecked_mut() };
39 let slice = this.buffer.as_mut_slice();
40 #[cfg(feature = "read_buf")]
41 {
42 let mut buf = io::BorrowedBuf::from(slice);
43 let mut cursor = buf.unfilled();
44 this.reader.read_buf(cursor.reborrow())?;
45 Poll::Ready(Ok(cursor.written()))
46 }
47 #[cfg(not(feature = "read_buf"))]
48 {
49 use std::mem::MaybeUninit;
50
51 slice.fill(MaybeUninit::new(0));
52 this.reader
53 .read(std::slice::from_raw_parts_mut(
54 this.buffer.as_buf_mut_ptr(),
55 this.buffer.buf_capacity(),
56 ))
57 .into()
58 }
59 }
60}
61
62impl<R: Read, B: IoBufMut> IntoInner for StdRead<R, B> {
63 type Inner = B;
64
65 fn into_inner(self) -> Self::Inner {
66 self.buffer
67 }
68}
69
70struct StdWrite<W: Write, B: IoBuf> {
71 writer: W,
72 buffer: B,
73}
74
75impl<W: Write, B: IoBuf> StdWrite<W, B> {
76 pub fn new(writer: W, buffer: B) -> Self {
77 Self { writer, buffer }
78 }
79}
80
81impl<W: Write, B: IoBuf> OpCode for StdWrite<W, B> {
82 fn op_type(&self) -> OpType {
83 OpType::Blocking
84 }
85
86 unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
87 let this = unsafe { self.get_unchecked_mut() };
88 let slice = this.buffer.as_slice();
89 this.writer.write(slice).into()
90 }
91}
92
93impl<W: Write, B: IoBuf> IntoInner for StdWrite<W, B> {
94 type Inner = B;
95
96 fn into_inner(self) -> Self::Inner {
97 self.buffer
98 }
99}
100
101#[derive(Debug)]
102struct StaticFd(RawHandle);
103
104impl AsFd for StaticFd {
105 fn as_fd(&self) -> BorrowedFd<'_> {
106 BorrowedFd::File(unsafe { BorrowedHandle::borrow_raw(self.0) })
108 }
109}
110
111impl AsRawFd for StaticFd {
112 fn as_raw_fd(&self) -> RawFd {
113 self.0 as _
114 }
115}
116
117static STDIN_ISATTY: OnceLock<bool> = OnceLock::new();
118
119#[derive(Debug, Clone)]
123pub struct Stdin {
124 fd: SharedFd<StaticFd>,
125 isatty: bool,
126}
127
128impl Stdin {
129 pub(crate) fn new() -> Self {
130 let stdin = io::stdin();
131 let isatty = *STDIN_ISATTY.get_or_init(|| {
132 stdin.is_terminal()
133 || Runtime::with_current(|r| r.attach(stdin.as_raw_handle() as _)).is_err()
134 });
135 Self {
136 fd: SharedFd::new(StaticFd(stdin.as_raw_handle())),
137 isatty,
138 }
139 }
140}
141
142impl AsyncRead for Stdin {
143 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
144 if self.isatty {
145 let op = StdRead::new(io::stdin(), buf);
146 compio_runtime::submit(op).await.into_inner()
147 } else {
148 let op = Recv::new(self.fd.clone(), buf);
149 compio_runtime::submit(op).await.into_inner()
150 }
151 .map_advanced()
152 }
153}
154
155impl AsyncReadManaged for Stdin {
156 type Buffer<'a> = BorrowedBuffer<'a>;
157 type BufferPool = BufferPool;
158
159 async fn read_managed<'a>(
160 &mut self,
161 buffer_pool: &'a Self::BufferPool,
162 len: usize,
163 ) -> io::Result<Self::Buffer<'a>> {
164 (&*self).read_managed(buffer_pool, len).await
165 }
166}
167
168impl AsyncReadManaged for &Stdin {
169 type Buffer<'a> = BorrowedBuffer<'a>;
170 type BufferPool = BufferPool;
171
172 async fn read_managed<'a>(
173 &mut self,
174 buffer_pool: &'a Self::BufferPool,
175 len: usize,
176 ) -> io::Result<Self::Buffer<'a>> {
177 let buffer_pool = buffer_pool.try_inner()?;
178 if self.isatty {
179 let buf = buffer_pool.get_buffer(len)?;
180 let op = StdRead::new(io::stdin(), buf);
181 let BufResult(res, buf) = compio_runtime::submit(op).await.into_inner();
182 let res = unsafe { buffer_pool.create_proxy(buf, res?) };
183 Ok(res)
184 } else {
185 let op = RecvManaged::new(self.fd.clone(), buffer_pool, len)?;
186 compio_runtime::submit_with_flags(op)
187 .await
188 .take_buffer(buffer_pool)
189 }
190 }
191}
192
193impl AsRawFd for Stdin {
194 fn as_raw_fd(&self) -> RawFd {
195 self.fd.as_raw_fd()
196 }
197}
198
199static STDOUT_ISATTY: OnceLock<bool> = OnceLock::new();
200
201#[derive(Debug, Clone)]
205pub struct Stdout {
206 fd: SharedFd<StaticFd>,
207 isatty: bool,
208}
209
210impl Stdout {
211 pub(crate) fn new() -> Self {
212 let stdout = io::stdout();
213 let isatty = *STDOUT_ISATTY.get_or_init(|| {
214 stdout.is_terminal()
215 || Runtime::with_current(|r| r.attach(stdout.as_raw_handle() as _)).is_err()
216 });
217 Self {
218 fd: SharedFd::new(StaticFd(stdout.as_raw_handle())),
219 isatty,
220 }
221 }
222}
223
224impl AsyncWrite for Stdout {
225 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
226 if self.isatty {
227 let op = StdWrite::new(io::stdout(), buf);
228 compio_runtime::submit(op).await.into_inner()
229 } else {
230 let op = Send::new(self.fd.clone(), buf);
231 compio_runtime::submit(op).await.into_inner()
232 }
233 }
234
235 async fn flush(&mut self) -> io::Result<()> {
236 Ok(())
237 }
238
239 async fn shutdown(&mut self) -> io::Result<()> {
240 self.flush().await
241 }
242}
243
244impl AsRawFd for Stdout {
245 fn as_raw_fd(&self) -> RawFd {
246 self.fd.as_raw_fd()
247 }
248}
249
250static STDERR_ISATTY: OnceLock<bool> = OnceLock::new();
251
252#[derive(Debug, Clone)]
256pub struct Stderr {
257 fd: SharedFd<StaticFd>,
258 isatty: bool,
259}
260
261impl Stderr {
262 pub(crate) fn new() -> Self {
263 let stderr = io::stderr();
264 let isatty = *STDERR_ISATTY.get_or_init(|| {
265 stderr.is_terminal()
266 || Runtime::with_current(|r| r.attach(stderr.as_raw_handle() as _)).is_err()
267 });
268 Self {
269 fd: SharedFd::new(StaticFd(stderr.as_raw_handle())),
270 isatty,
271 }
272 }
273}
274
275impl AsyncWrite for Stderr {
276 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
277 if self.isatty {
278 let op = StdWrite::new(io::stderr(), buf);
279 compio_runtime::submit(op).await.into_inner()
280 } else {
281 let op = Send::new(self.fd.clone(), buf);
282 compio_runtime::submit(op).await.into_inner()
283 }
284 }
285
286 async fn flush(&mut self) -> io::Result<()> {
287 Ok(())
288 }
289
290 async fn shutdown(&mut self) -> io::Result<()> {
291 self.flush().await
292 }
293}
294
295impl AsRawFd for Stderr {
296 fn as_raw_fd(&self) -> RawFd {
297 self.fd.as_raw_fd()
298 }
299}