compio_fs/stdio/
windows.rs

1use std::{
2    io::{self, IsTerminal, Read, Write},
3    os::windows::io::{AsRawHandle, BorrowedHandle, RawHandle},
4    pin::Pin,
5    sync::OnceLock,
6    task::Poll,
7};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
10use compio_driver::{
11    AsFd, AsRawFd, BorrowedFd, OpCode, OpType, RawFd, SharedFd,
12    op::{BufResultExt, Recv, RecvManaged, ResultTakeBuffer, Send},
13};
14use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
15use compio_runtime::{BorrowedBuffer, BufferPool, Runtime};
16use windows_sys::Win32::System::IO::OVERLAPPED;
17
18#[cfg(doc)]
19use super::{stderr, stdin, stdout};
20
21struct StdRead<R: Read, B: IoBufMut> {
22    reader: R,
23    buffer: B,
24}
25
26impl<R: Read, B: IoBufMut> StdRead<R, B> {
27    pub fn new(reader: R, buffer: B) -> Self {
28        Self { reader, buffer }
29    }
30}
31
32impl<R: Read, B: IoBufMut> OpCode for StdRead<R, B> {
33    fn op_type(&self) -> OpType {
34        OpType::Blocking
35    }
36
37    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
38        let this = unsafe { self.get_unchecked_mut() };
39        let slice = this.buffer.as_mut_slice();
40        #[cfg(feature = "read_buf")]
41        {
42            let mut buf = io::BorrowedBuf::from(slice);
43            let mut cursor = buf.unfilled();
44            this.reader.read_buf(cursor.reborrow())?;
45            Poll::Ready(Ok(cursor.written()))
46        }
47        #[cfg(not(feature = "read_buf"))]
48        {
49            use std::mem::MaybeUninit;
50
51            slice.fill(MaybeUninit::new(0));
52            this.reader
53                .read(std::slice::from_raw_parts_mut(
54                    this.buffer.as_buf_mut_ptr(),
55                    this.buffer.buf_capacity(),
56                ))
57                .into()
58        }
59    }
60}
61
62impl<R: Read, B: IoBufMut> IntoInner for StdRead<R, B> {
63    type Inner = B;
64
65    fn into_inner(self) -> Self::Inner {
66        self.buffer
67    }
68}
69
70struct StdWrite<W: Write, B: IoBuf> {
71    writer: W,
72    buffer: B,
73}
74
75impl<W: Write, B: IoBuf> StdWrite<W, B> {
76    pub fn new(writer: W, buffer: B) -> Self {
77        Self { writer, buffer }
78    }
79}
80
81impl<W: Write, B: IoBuf> OpCode for StdWrite<W, B> {
82    fn op_type(&self) -> OpType {
83        OpType::Blocking
84    }
85
86    unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
87        let this = unsafe { self.get_unchecked_mut() };
88        let slice = this.buffer.as_slice();
89        this.writer.write(slice).into()
90    }
91}
92
93impl<W: Write, B: IoBuf> IntoInner for StdWrite<W, B> {
94    type Inner = B;
95
96    fn into_inner(self) -> Self::Inner {
97        self.buffer
98    }
99}
100
101#[derive(Debug)]
102struct StaticFd(RawHandle);
103
104impl AsFd for StaticFd {
105    fn as_fd(&self) -> BorrowedFd<'_> {
106        // SAFETY: we only use it for console handles.
107        BorrowedFd::File(unsafe { BorrowedHandle::borrow_raw(self.0) })
108    }
109}
110
111impl AsRawFd for StaticFd {
112    fn as_raw_fd(&self) -> RawFd {
113        self.0 as _
114    }
115}
116
117static STDIN_ISATTY: OnceLock<bool> = OnceLock::new();
118
119/// A handle to the standard input stream of a process.
120///
121/// See [`stdin`].
122#[derive(Debug, Clone)]
123pub struct Stdin {
124    fd: SharedFd<StaticFd>,
125    isatty: bool,
126}
127
128impl Stdin {
129    pub(crate) fn new() -> Self {
130        let stdin = io::stdin();
131        let isatty = *STDIN_ISATTY.get_or_init(|| {
132            stdin.is_terminal()
133                || Runtime::with_current(|r| r.attach(stdin.as_raw_handle() as _)).is_err()
134        });
135        Self {
136            fd: SharedFd::new(StaticFd(stdin.as_raw_handle())),
137            isatty,
138        }
139    }
140}
141
142impl AsyncRead for Stdin {
143    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
144        if self.isatty {
145            let op = StdRead::new(io::stdin(), buf);
146            compio_runtime::submit(op).await.into_inner()
147        } else {
148            let op = Recv::new(self.fd.clone(), buf);
149            compio_runtime::submit(op).await.into_inner()
150        }
151        .map_advanced()
152    }
153}
154
155impl AsyncReadManaged for Stdin {
156    type Buffer<'a> = BorrowedBuffer<'a>;
157    type BufferPool = BufferPool;
158
159    async fn read_managed<'a>(
160        &mut self,
161        buffer_pool: &'a Self::BufferPool,
162        len: usize,
163    ) -> io::Result<Self::Buffer<'a>> {
164        (&*self).read_managed(buffer_pool, len).await
165    }
166}
167
168impl AsyncReadManaged for &Stdin {
169    type Buffer<'a> = BorrowedBuffer<'a>;
170    type BufferPool = BufferPool;
171
172    async fn read_managed<'a>(
173        &mut self,
174        buffer_pool: &'a Self::BufferPool,
175        len: usize,
176    ) -> io::Result<Self::Buffer<'a>> {
177        let buffer_pool = buffer_pool.try_inner()?;
178        if self.isatty {
179            let buf = buffer_pool.get_buffer(len)?;
180            let op = StdRead::new(io::stdin(), buf);
181            let BufResult(res, buf) = compio_runtime::submit(op).await.into_inner();
182            let res = unsafe { buffer_pool.create_proxy(buf, res?) };
183            Ok(res)
184        } else {
185            let op = RecvManaged::new(self.fd.clone(), buffer_pool, len)?;
186            compio_runtime::submit_with_flags(op)
187                .await
188                .take_buffer(buffer_pool)
189        }
190    }
191}
192
193impl AsRawFd for Stdin {
194    fn as_raw_fd(&self) -> RawFd {
195        self.fd.as_raw_fd()
196    }
197}
198
199static STDOUT_ISATTY: OnceLock<bool> = OnceLock::new();
200
201/// A handle to the standard output stream of a process.
202///
203/// See [`stdout`].
204#[derive(Debug, Clone)]
205pub struct Stdout {
206    fd: SharedFd<StaticFd>,
207    isatty: bool,
208}
209
210impl Stdout {
211    pub(crate) fn new() -> Self {
212        let stdout = io::stdout();
213        let isatty = *STDOUT_ISATTY.get_or_init(|| {
214            stdout.is_terminal()
215                || Runtime::with_current(|r| r.attach(stdout.as_raw_handle() as _)).is_err()
216        });
217        Self {
218            fd: SharedFd::new(StaticFd(stdout.as_raw_handle())),
219            isatty,
220        }
221    }
222}
223
224impl AsyncWrite for Stdout {
225    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
226        if self.isatty {
227            let op = StdWrite::new(io::stdout(), buf);
228            compio_runtime::submit(op).await.into_inner()
229        } else {
230            let op = Send::new(self.fd.clone(), buf);
231            compio_runtime::submit(op).await.into_inner()
232        }
233    }
234
235    async fn flush(&mut self) -> io::Result<()> {
236        Ok(())
237    }
238
239    async fn shutdown(&mut self) -> io::Result<()> {
240        self.flush().await
241    }
242}
243
244impl AsRawFd for Stdout {
245    fn as_raw_fd(&self) -> RawFd {
246        self.fd.as_raw_fd()
247    }
248}
249
250static STDERR_ISATTY: OnceLock<bool> = OnceLock::new();
251
252/// A handle to the standard output stream of a process.
253///
254/// See [`stderr`].
255#[derive(Debug, Clone)]
256pub struct Stderr {
257    fd: SharedFd<StaticFd>,
258    isatty: bool,
259}
260
261impl Stderr {
262    pub(crate) fn new() -> Self {
263        let stderr = io::stderr();
264        let isatty = *STDERR_ISATTY.get_or_init(|| {
265            stderr.is_terminal()
266                || Runtime::with_current(|r| r.attach(stderr.as_raw_handle() as _)).is_err()
267        });
268        Self {
269            fd: SharedFd::new(StaticFd(stderr.as_raw_handle())),
270            isatty,
271        }
272    }
273}
274
275impl AsyncWrite for Stderr {
276    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
277        if self.isatty {
278            let op = StdWrite::new(io::stderr(), buf);
279            compio_runtime::submit(op).await.into_inner()
280        } else {
281            let op = Send::new(self.fd.clone(), buf);
282            compio_runtime::submit(op).await.into_inner()
283        }
284    }
285
286    async fn flush(&mut self) -> io::Result<()> {
287        Ok(())
288    }
289
290    async fn shutdown(&mut self) -> io::Result<()> {
291        self.flush().await
292    }
293}
294
295impl AsRawFd for Stderr {
296    fn as_raw_fd(&self) -> RawFd {
297        self.fd.as_raw_fd()
298    }
299}