async_lsp/
stdio.rs

1//! Utilities to deal with stdin/stdout communication channel for Language Servers.
2//!
3//! Typically Language Servers serves on stdin/stdout by default. But generally they cannot be read
4//! or written asynchronously usually, due to technical reasons.
5//! (Eg. [`tokio::io::stdin`](https://docs.rs/tokio/1.27.0/tokio/io/fn.stdin.html) delegates reads
6//! to blocking threads.)
7//!
8//! This mod defines [`PipeStdin`] and [`PipeStdout`] for only stdin/stdout with pipe-like
9//! backends, which actually supports asynchronous reads and writes. This currently means one of:
10//! - FIFO pipes. Eg. named pipes [mkfifo(3)] and unnamed pipes [pipe(2)].
11//! - Sockets. Eg. TCP connections and UNIX domain sockets [unix(7)].
12//! - Character devices. Eg. [tty(4)] or [pty(7)].
13//!
14//! [mkfifo(3)]: https://man7.org/linux/man-pages/man3/mkfifo.3.html
15//! [pipe(2)]: https://man7.org/linux/man-pages/man2/pipe.2.html
16//! [unix(7)]: https://man7.org/linux/man-pages/man7/unix.7.html
17//! [tty(4)]: https://man7.org/linux/man-pages/man4/tty.4.html
18//! [pty(7)]: https://man7.org/linux/man-pages/man7/pty.7.html
19//!
20//! When calling [`PipeStdin::lock`], it locks the stdin using [`std::io::stdin`], set its mode to
21//! asynchronous, and exposes an a raw [`Read`] interface bypassing the std's buffer.
22//!
23//! # Caveats
24//!
25//! Since `PipeStd{in,out}` bypass the std's internal buffer. You should not leave any data in that
26//! buffer (via [`std::io::stdin`], [`print!`]-like macros and etc.). Otherwise they will be
27//! ignored during `PipeStd{in,out}` operations, which is typically a logic error.
28//!
29//! # Asynchrous I/O drivers
30//!
31//! ## `async-io`
32//!
33//! Wrapping `PipeStd{in,out}` inside `async_io::Async<T>` works. This should also work for other
34//! drivers with a similar generic asynchronous adapter interface.
35//!
36//! For `async-io` >= 2, feature `async-io` should be enabled to let `Async<PipeStd{in,out}>`
37//! implements `Async{Read,Write}`. `async-io` < 2 does not require it to work.
38//! See more details in: <https://github.com/smol-rs/async-io/pull/142>
39//!
40//! ```
41//! # async fn work() -> std::io::Result<()> {
42//! use futures::AsyncWriteExt;
43//!
44//! let mut stdout = async_io::Async::new(async_lsp::stdio::PipeStdout::lock()?)?;
45//! // For `async-io` >= 2, this requires `async-io` feature to work.
46#![cfg_attr(not(feature = "async-io"), doc = "# #[cfg(never)]")]
47//! stdout.write_all(b"never spawns blocking tasks").await?;
48//! // This always work.
49//! (&stdout).write_all(b"never spawns blocking tasks").await?;
50//! # Ok(())
51//! # }
52//! ```
53//!
54//! ## `tokio`
55//!
56//! There are methods `PipeStd{in,out}::{lock,try_into}_tokio` gated under feature `tokio` to
57//! work with `tokio` runtime. The returned type implements corresponding
58//! `tokio::io::Async{Read,Write}` interface.
59//!
60//! ```
61//! # #[cfg(feature = "tokio")]
62//! # async fn work() -> std::io::Result<()> {
63//! use tokio::io::AsyncWriteExt;
64//!
65//! let mut stdout = async_lsp::stdio::PipeStdout::lock_tokio()?;
66//! stdout.write_all(b"never spawns blocking tasks").await?;
67//! # Ok(())
68//! # }
69//! ```
70use std::io::{self, Error, ErrorKind, IoSlice, Read, Result, StdinLock, StdoutLock, Write};
71use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
72
73use rustix::fs::{fcntl_getfl, fcntl_setfl, fstat, FileType, OFlags};
74
75#[derive(Debug)]
76struct NonBlocking<T: AsFd> {
77    inner: T,
78    prev_flags: OFlags,
79}
80
81impl<T: AsFd> NonBlocking<T> {
82    fn new(inner: T) -> Result<Self> {
83        let ft = FileType::from_raw_mode(fstat(&inner)?.st_mode);
84        if !matches!(
85            ft,
86            FileType::Fifo | FileType::Socket | FileType::CharacterDevice
87        ) {
88            return Err(Error::new(
89                ErrorKind::Other,
90                format!("File type {ft:?} is not pipe-like"),
91            ));
92        }
93
94        let prev_flags = fcntl_getfl(&inner)?;
95        fcntl_setfl(&inner, prev_flags | OFlags::NONBLOCK)?;
96        Ok(Self { inner, prev_flags })
97    }
98}
99
100impl<T: AsFd> Drop for NonBlocking<T> {
101    fn drop(&mut self) {
102        let _: std::result::Result<_, _> = fcntl_setfl(&self.inner, self.prev_flags);
103    }
104}
105
106/// Locked stdin for asynchronous read.
107#[derive(Debug)]
108pub struct PipeStdin {
109    inner: NonBlocking<StdinLock<'static>>,
110}
111
112impl PipeStdin {
113    /// Lock stdin with pipe-like backend and set it to asynchronous mode.
114    ///
115    /// # Errors
116    ///
117    /// Fails if the underlying FD is not pipe-like, or error occurs when setting mode.
118    /// See [module level documentation](index.html) for more details.
119    pub fn lock() -> Result<Self> {
120        let inner = NonBlocking::new(io::stdin().lock())?;
121        Ok(Self { inner })
122    }
123}
124
125impl AsFd for PipeStdin {
126    fn as_fd(&self) -> BorrowedFd<'_> {
127        self.inner.inner.as_fd()
128    }
129}
130
131impl AsRawFd for PipeStdin {
132    fn as_raw_fd(&self) -> RawFd {
133        self.inner.inner.as_raw_fd()
134    }
135}
136
137// Invariant: `AsFd::as_fd` never changes through its lifetime.
138#[cfg(feature = "async-io")]
139unsafe impl async_io::IoSafe for PipeStdin {}
140
141// NB. Bypass the internal buffer of `StdinLock` here to keep this in sync with the readiness of
142// the underlying FD (which is relied by the I/O re/actor).
143impl Read for &'_ PipeStdin {
144    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
145        rustix::io::read(self, buf).map_err(Into::into)
146    }
147
148    fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> Result<usize> {
149        rustix::io::readv(self, bufs).map_err(Into::into)
150    }
151}
152
153impl Read for PipeStdin {
154    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
155        <&PipeStdin>::read(&mut &*self, buf)
156    }
157
158    fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> Result<usize> {
159        <&PipeStdin>::read_vectored(&mut &*self, bufs)
160    }
161}
162
163/// Locked stdout for asynchronous read.
164#[derive(Debug)]
165pub struct PipeStdout {
166    inner: NonBlocking<StdoutLock<'static>>,
167}
168
169impl PipeStdout {
170    /// Lock stdout with pipe-like backend and set it to asynchronous mode.
171    ///
172    /// # Errors
173    /// Fails if the underlying FD is not pipe-like, or error occurs when setting mode.
174    /// See [module level documentation](index.html) for more details.
175    pub fn lock() -> Result<Self> {
176        let inner = NonBlocking::new(io::stdout().lock())?;
177        Ok(Self { inner })
178    }
179}
180
181impl AsFd for PipeStdout {
182    fn as_fd(&self) -> BorrowedFd<'_> {
183        self.inner.inner.as_fd()
184    }
185}
186
187impl AsRawFd for PipeStdout {
188    fn as_raw_fd(&self) -> RawFd {
189        self.inner.inner.as_raw_fd()
190    }
191}
192
193// Invariant: `AsFd::as_fd` never changes through its lifetime.
194#[cfg(feature = "async-io")]
195unsafe impl async_io::IoSafe for PipeStdout {}
196
197// NB. See `Read` impl.
198impl Write for &'_ PipeStdout {
199    fn write(&mut self, buf: &[u8]) -> Result<usize> {
200        rustix::io::write(self, buf).map_err(Into::into)
201    }
202
203    fn flush(&mut self) -> Result<()> {
204        Ok(())
205    }
206
207    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
208        rustix::io::writev(self, bufs).map_err(Into::into)
209    }
210}
211
212impl Write for PipeStdout {
213    fn write(&mut self, buf: &[u8]) -> Result<usize> {
214        <&PipeStdout>::write(&mut &*self, buf)
215    }
216
217    fn flush(&mut self) -> Result<()> {
218        <&PipeStdout>::flush(&mut &*self)
219    }
220
221    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
222        <&PipeStdout>::write_vectored(&mut &*self, bufs)
223    }
224}
225
226// Tokio compatibility.
227// We can simplify these if we have https://github.com/tokio-rs/tokio/issues/5785
228#[cfg(feature = "tokio")]
229#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
230mod tokio_impl {
231    use std::pin::Pin;
232    use std::task::{Context, Poll};
233
234    use futures::ready;
235    use tokio::io::unix::AsyncFd;
236    use tokio::io::{Interest, ReadBuf};
237
238    use super::*;
239
240    pub struct TokioPipeStdin {
241        inner: AsyncFd<PipeStdin>,
242    }
243
244    impl futures::AsyncRead for TokioPipeStdin {
245        fn poll_read(
246            self: Pin<&mut Self>,
247            cx: &mut Context<'_>,
248            buf: &mut [u8],
249        ) -> Poll<Result<usize>> {
250            loop {
251                let mut guard = ready!(self.inner.poll_read_ready(cx))?;
252                match guard.try_io(|inner| inner.get_ref().read(buf)) {
253                    Ok(ret) => return Poll::Ready(ret),
254                    Err(_would_block) => continue,
255                }
256            }
257        }
258
259        fn poll_read_vectored(
260            self: Pin<&mut Self>,
261            cx: &mut Context<'_>,
262            bufs: &mut [io::IoSliceMut<'_>],
263        ) -> Poll<Result<usize>> {
264            loop {
265                let mut guard = ready!(self.inner.poll_read_ready(cx))?;
266                match guard.try_io(|inner| inner.get_ref().read_vectored(bufs)) {
267                    Ok(ret) => return Poll::Ready(ret),
268                    Err(_would_block) => continue,
269                }
270            }
271        }
272    }
273
274    impl tokio::io::AsyncRead for TokioPipeStdin {
275        fn poll_read(
276            self: Pin<&mut Self>,
277            cx: &mut Context<'_>,
278            buf: &mut ReadBuf<'_>,
279        ) -> Poll<io::Result<()>> {
280            let len = loop {
281                let mut guard = ready!(self.inner.poll_read_ready(cx))?;
282                match guard.try_io(|inner| {
283                    // SAFETY: `read()` does not de-initialize any byte.
284                    let (written, _) = rustix::io::read(inner, unsafe { buf.unfilled_mut() })?;
285                    Ok(written.len())
286                }) {
287                    Ok(ret) => break ret?,
288                    Err(_would_block) => continue,
289                }
290            };
291            buf.advance(len);
292            Poll::Ready(Ok(()))
293        }
294    }
295
296    impl PipeStdin {
297        /// Shortcut to [`PipeStdin::lock`] and then [`PipeStdin::try_into_tokio`].
298        ///
299        /// # Errors
300        ///
301        /// Fails if cannot create [`AsyncFd`].
302        pub fn lock_tokio() -> Result<TokioPipeStdin> {
303            Self::lock()?.try_into_tokio()
304        }
305
306        /// Register the FD to the tokio runtime and return a tokio compatible reader.
307        ///
308        /// # Errors
309        ///
310        /// Fails if cannot create [`AsyncFd`].
311        pub fn try_into_tokio(self) -> Result<TokioPipeStdin> {
312            let inner = AsyncFd::with_interest(self, Interest::READABLE)?;
313            Ok(TokioPipeStdin { inner })
314        }
315    }
316
317    pub struct TokioPipeStdout {
318        inner: AsyncFd<PipeStdout>,
319    }
320
321    impl futures::AsyncWrite for TokioPipeStdout {
322        fn poll_write(
323            self: Pin<&mut Self>,
324            cx: &mut Context<'_>,
325            buf: &[u8],
326        ) -> Poll<Result<usize>> {
327            loop {
328                let mut guard = ready!(self.inner.poll_write_ready(cx))?;
329                match guard.try_io(|inner| inner.get_ref().write(buf)) {
330                    Ok(result) => return Poll::Ready(result),
331                    Err(_would_block) => continue,
332                }
333            }
334        }
335
336        fn poll_write_vectored(
337            self: Pin<&mut Self>,
338            cx: &mut Context<'_>,
339            bufs: &[IoSlice<'_>],
340        ) -> Poll<Result<usize>> {
341            loop {
342                let mut guard = ready!(self.inner.poll_write_ready(cx))?;
343                match guard.try_io(|inner| inner.get_ref().write_vectored(bufs)) {
344                    Ok(result) => return Poll::Ready(result),
345                    Err(_would_block) => continue,
346                }
347            }
348        }
349
350        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
351            Poll::Ready(Ok(()))
352        }
353
354        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
355            Poll::Ready(Ok(()))
356        }
357    }
358
359    impl tokio::io::AsyncWrite for TokioPipeStdout {
360        fn poll_write(
361            self: Pin<&mut Self>,
362            cx: &mut Context<'_>,
363            buf: &[u8],
364        ) -> Poll<Result<usize>> {
365            <Self as futures::AsyncWrite>::poll_write(self, cx, buf)
366        }
367
368        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
369            Poll::Ready(Ok(()))
370        }
371
372        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
373            Poll::Ready(Ok(()))
374        }
375    }
376
377    impl PipeStdout {
378        /// Shortcut to [`PipeStdout::lock`] and then [`PipeStdout::try_into_tokio`].
379        ///
380        /// # Errors
381        ///
382        /// Fails if cannot create [`AsyncFd`].
383        pub fn lock_tokio() -> Result<TokioPipeStdout> {
384            Self::lock()?.try_into_tokio()
385        }
386
387        /// Register the FD to the tokio runtime and return a tokio compatible writer.
388        ///
389        /// # Errors
390        ///
391        /// Fails if cannot create [`AsyncFd`].
392        pub fn try_into_tokio(self) -> Result<TokioPipeStdout> {
393            let inner = AsyncFd::with_interest(self, Interest::WRITABLE)?;
394            Ok(TokioPipeStdout { inner })
395        }
396    }
397}