async_blocking_stdio/
lib.rs

1//! [stdio]: `std::io`
2//!
3//! [stdstdout]: `std::io::Stdout`
4//! [stdstderr]: `std::io::Stderr`
5//! [stdstdin]: `std::io::Stdin`
6//!
7//! [stdstdoutlock]: `std::io::StdoutLock`
8//! [stdstderrlock]: `std::io::StderrLock`
9//! [stdstdinlock]: `std::io::StdinLock`
10//!
11//! [send-trait]: `core::marker::Send`
12//!
13//! [blocking-unblock]: `blocking::Unblock`
14//! [fl-block-on]: `futures_lite::io::BlockOn`
15//! [fl-io]: `futures_lite::io`
16//!
17//! [selfstdoutstruct]: `StdoutUnblock`
18//! [selfstderrstruct]: `StderrUnblock`
19//! [selfstdinstruct]: `StdinUnblock`
20//!
21//! [selfstdoutfn]: `stdout`
22//! [selfstderrfn]: `stderr`
23//! [selfstdinfn]: `stdin`
24#![doc = include_str!("../README.md")]
25use blocking::Unblock;
26use core::pin::Pin;
27use futures_lite::{io::BufReader, AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, Stream};
28use std::{
29    io,
30    sync::{LazyLock, OnceLock},
31};
32
33mod sealed {
34    use blocking::Unblock;
35    use futures_lite::io::BufReader;
36
37    /// A [`std::io`] standard io stream.
38    pub trait StdioSealed {
39        /// Inner implementation on how, exactly, to actually unblock the stream.
40        ///
41        /// Some streams require extra wrappers to properly unblock them while retaining
42        /// full efficiency - this provides that.
43        type Unblocked;
44    }
45
46    impl StdioSealed for std::io::Stdin {
47        /// We need [`BufReader`] here - [`Unblock`] contains no implementation of [`BufReader`]
48        /// because it would need to implement special operation modes. To get back the buffering,
49        /// we use a [`BufReader`] - the async form - outside it.
50        type Unblocked = BufReader<Unblock<Self>>;
51    }
52    impl StdioSealed for std::io::Stdout {
53        type Unblocked = Unblock<Self>;
54    }
55    impl StdioSealed for std::io::Stderr {
56        type Unblocked = Unblock<Self>;
57    }
58}
59
60/// Extension trait that provides easy access to the corresponding asynchronous types for the
61/// streams in [`std::io`]
62pub trait StdioExt: sealed::StdioSealed {
63    /// Handle to the async version of the standard io stream.
64    type Handle;
65
66    /// Lock on the async handle to this async-wrapped/unblocked standard io stream.
67    type Locked<'lt>;
68
69    /// Lock on the async handle to this async-wrapped/unblocked standard io stream, wrapped again
70    /// so that the synchronous traits from [`std::io`] can be used on it.
71    type LockedAndSync<'lt>;
72
73    /// Get the handle to the global asynchronous version of this standard io stream as provided by
74    /// [`async_blocking_stdio`][crate].
75    fn async_handle() -> Self::Handle
76    where
77        Self::Handle: Sized;
78}
79
80/// Prelude module - includes the [`crate::StdioExt`] trait, mostly.
81pub mod prelude {
82    pub use super::StdioExt as _;
83}
84
85type SeqMutex<T> = async_lock::Mutex<T>;
86type SeqMutexGuard<'lt, T> = async_lock::MutexGuard<'lt, T>;
87
88/// Inner storage type that is stored exactly once. The type parameter is actually the full
89/// representation including any Unblock or BufReaders
90#[derive(Debug)]
91struct MutexUnblock<T: ?Sized>(pub SeqMutex<T>);
92
93impl<T: ?Sized> MutexUnblock<T> {
94    /// Lock it in an async context
95    #[inline]
96    pub async fn lock(&self) -> MutexUnblockGuard<'_, T> {
97        MutexUnblockGuard::new(self.0.lock().await)
98    }
99
100    /// Lock it in a sync context - do not use in async context or risk deadlocks!
101    #[inline]
102    pub fn lock_blocking(&self) -> MutexUnblockGuard<'_, T> {
103        MutexUnblockGuard::new(self.0.lock_blocking())
104    }
105
106    /// Attempt to lock it if available
107    #[inline]
108    pub fn try_lock(&self) -> Option<MutexUnblockGuard<'_, T>> {
109        self.0.try_lock().map(MutexUnblockGuard::new)
110    }
111}
112
113/// Wrapper type for making an async-locked [`blocking::Unblock`] Stdio stream. Parameter is one of
114/// the [`io::Stdout`]/[`io::Stderr`]/[`io::Stdin`] types plus any [`blocking::Unblock`] or other wrappers.
115/// This holds a static reference to an internal shared mutex. Some things may need outer
116/// BufRead/BufWrite attachments to make it match the equivalent sync trait impls - this is done as
117/// much as possible to match the stdio inside the stdlib.
118///
119/// Unlike the [`std::io`] unlocked-handles, this does not implement the various [`futures_lite::io`]
120/// traits. You need to use [`MutexUnblockHandle::lock`] or other locking methods to get access to a
121/// handle which can be used with these traits. Unlike the Standard Library, these handles are not
122/// re-entrant, which means that attempting to lock them in some inner future while the outer lock
123/// is held will cause a deadlock.
124///
125/// An important note is that [`Unblock`] implements [`Unpin`] unconditionally. This makes it easy to
126/// make generic implementations that will work nicely.
127#[derive(Debug)]
128pub struct MutexUnblockHandle<T: 'static + ?Sized>(&'static MutexUnblock<T>);
129
130impl<T: ?Sized> MutexUnblockHandle<T> {
131    /// Locks this handle asynchronously.
132    #[inline]
133    pub async fn lock(&self) -> MutexUnblockGuard<'static, T> {
134        self.0.lock().await
135    }
136
137    /// Lock this handle, but produce something that implements the synchronous [`std::io`]
138    /// traits.
139    #[inline]
140    pub async fn lock_into_sync(&self) -> futures_lite::io::BlockOn<MutexUnblockGuard<'static, T>> {
141        futures_lite::io::BlockOn::new(self.lock().await)
142    }
143
144    /// Lock this handle in a sync context. Do not call in an async context or risk deadlocks!.
145    #[inline]
146    pub fn lock_blocking(&self) -> MutexUnblockGuard<'static, T> {
147        self.0.lock_blocking()
148    }
149
150    /// Lock this handle in a sync context, but producing something that implements the
151    /// synchronous [`std::io`] traits.
152    ///
153    /// Do not call in an async context or risk deadlocks!.
154    #[inline]
155    pub fn lock_blocking_into_sync(
156        &self,
157    ) -> futures_lite::io::BlockOn<MutexUnblockGuard<'static, T>> {
158        futures_lite::io::BlockOn::new(self.lock_blocking())
159    }
160
161    /// Attempt to lock this handle if you can
162    #[inline]
163    pub fn try_lock(&self) -> Option<MutexUnblockGuard<'static, T>> {
164        self.0.try_lock()
165    }
166
167    /// Attempt to lock this handle and wrap it in something that can be used with the synchronous
168    /// [`std::io`] traits.
169    #[inline]
170    pub fn try_lock_into_sync(
171        &self,
172    ) -> Option<futures_lite::io::BlockOn<MutexUnblockGuard<'static, T>>> {
173        self.try_lock().map(futures_lite::io::BlockOn::new)
174    }
175}
176
177/// Handle to the globally-synchronised structure for async standard input.
178pub type StdinUnblock = <io::Stdin as StdioExt>::Handle;
179
180/// Handle to the globally-synchronised structure for async standard output.
181pub type StdoutUnblock = <io::Stdout as StdioExt>::Handle;
182
183/// Handle to the globally-synchronised structure for async standard error.
184pub type StderrUnblock = <io::Stderr as StdioExt>::Handle;
185
186pin_project_lite::pin_project! {
187    /// Wrapper type indicating a held async-locked [`blocking::Unblock`] Stdio stream. Parameter is
188    /// the [`io::Stdout`]/[`io::Stderr`]/[`io::Stdin`] types.
189    ///
190    /// Using the various [`futures_lite::io`] traits on values of this type has all the same caveats
191    /// of using them on a [`blocking::Unblock`].
192    ///
193    /// This is not re-entrant. This means that if you hold this lock, while waiting for another
194    /// access to the same asynchronous-friendly mutex to be locked (e.g. if you create a future by
195    /// calling an async function, that waits for the corresponding mutex to be unlocked, and then
196    /// wait for that future to complete while holding the lock), it will cause a deadlock.
197    ///
198    /// This also deliberately does not expose any sort of `Deref` implementation, to avoid
199    /// exposing the internals of the type for public consumption. 
200    #[derive(Debug)]
201    #[clippy::has_significant_drop]
202    pub struct MutexUnblockGuard<'lt, T: ?Sized> {
203        // This uses pin_project - the inner mutex guard needs to be pinned to access the internals in a
204        // pinned way.
205        #[pin] inner: SeqMutexGuard<'lt, T>
206    }
207}
208
209impl<'lt, T: ?Sized> MutexUnblockGuard<'lt, T> {
210    /// Internal construction function - not exposed as `From` because this is not a public API
211    #[inline]
212    fn new(inner: SeqMutexGuard<'lt, T>) -> Self {
213        Self { inner }
214    }
215}
216
217/// Lock on the global asynchronous accessor for Standard Input
218///
219/// This lock is not reentrant. If you hold this lock, and then wait for something else that needs
220/// to wait to lock this, then it will cause a deadlock.
221pub type StdinUnblockLock<'lt> = <std::io::Stdin as StdioExt>::Locked<'lt>;
222
223/// Lock on the global asynchronous accessor for Standard Output
224///
225/// This lock is not reentrant. If you hold this lock, and then wait for something else that needs
226/// to wait to lock this, then it will cause a deadlock.
227pub type StdoutUnblockLock<'lt> = <std::io::Stdout as StdioExt>::Locked<'lt>;
228
229/// Lock on the global asynchronous accessor for Standard Error
230///
231/// This lock is not reentrant. If you hold this lock, and then wait for something else that needs
232/// to wait to lock this, then it will cause a deadlock.
233pub type StderrUnblockLock<'lt> = <std::io::Stderr as StdioExt>::Locked<'lt>;
234
235impl StdioExt for std::io::Stdout {
236    type Handle = MutexUnblockHandle<Self::Unblocked>;
237
238    type Locked<'lt> = MutexUnblockGuard<'lt, Self::Unblocked>;
239
240    type LockedAndSync<'lt> = futures_lite::io::BlockOn<Self::Locked<'static>>;
241
242    #[inline]
243    fn async_handle() -> Self::Handle
244    where
245        Self::Handle: Sized,
246    {
247        stdout()
248    }
249}
250
251impl StdioExt for std::io::Stderr {
252    type Handle = MutexUnblockHandle<Self::Unblocked>;
253
254    type Locked<'lt> = MutexUnblockGuard<'lt, Self::Unblocked>;
255
256    type LockedAndSync<'lt> = futures_lite::io::BlockOn<Self::Locked<'static>>;
257
258    #[inline]
259    fn async_handle() -> Self::Handle
260    where
261        Self::Handle: Sized,
262    {
263        stderr()
264    }
265}
266
267impl StdioExt for std::io::Stdin {
268    type Handle = MutexUnblockHandle<Self::Unblocked>;
269
270    type Locked<'lt> = MutexUnblockGuard<'lt, Self::Unblocked>;
271
272    type LockedAndSync<'lt> = futures_lite::io::BlockOn<Self::Locked<'static>>;
273
274    #[inline]
275    fn async_handle() -> Self::Handle
276    where
277        Self::Handle: Sized,
278    {
279        stdin()
280    }
281}
282
283pub(crate) mod cleanup {
284    use futures_lite::AsyncWriteExt as _;
285    use std::{io, sync};
286
287    use crate::{STDERR, STDOUT};
288    /// If [STDOUT] is initialised, this flushes the stream as a cleanup mechanism. This is important
289    /// because [Unblock] has an internal buffer separate from the inner stdout (which has a similar
290    /// cleanup mechanism).
291    ///
292    /// This function ensures flushing occurs no more than once, even if called multiple times.
293    fn cleanup_flush_stdout() {
294        static FLUSH_ONCE: sync::Once = sync::Once::new();
295        // It's important  we use .get() here rather than simple deref, because if we did that, and
296        // the program exited during initialization, it could cause infinite loops, or deadlock, or
297        // something like that (as then it would try to initialize again).
298        if let Some(init_stdout) = STDOUT.get() {
299            FLUSH_ONCE.call_once(|| {
300                let mut init_stdout = init_stdout.lock_blocking();
301
302                let _result = nolocal_block_on::block_on(async {
303                    loop {
304                        match init_stdout.flush().await {
305                            Ok(()) => break Ok(()),
306                            Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
307                            Err(e) => break Err(e),
308                        }
309                    }
310                });
311            })
312        }
313    }
314
315    /// If [STDERR] is initialised, this flushes the stream as a cleanup mechanism. This is important
316    /// because [Unblock] has an internal buffer separate from the inner stderr. The inner stderr
317    /// itself is not buffered, though.
318    ///
319    /// This function ensures flushing occurs no more than once, even if called multiple times.
320    fn cleanup_flush_stderr() {
321        static FLUSH_ONCE: sync::Once = sync::Once::new();
322        // It's important  we use .get() here rather than simple deref, because if we did that, and
323        // the program exited during initialization, it could cause infinite loops, or deadlock, or
324        // something like that (as then it would try to initialize again).
325        if let Some(init_stderr) = STDERR.get() {
326            FLUSH_ONCE.call_once(|| {
327                let mut init_stderr = init_stderr.lock_blocking();
328
329                let _result = nolocal_block_on::block_on(async {
330                    loop {
331                        match init_stderr.flush().await {
332                            Ok(()) => break Ok(()),
333                            Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
334                            Err(e) => break Err(e),
335                        }
336                    }
337                });
338            })
339        }
340    }
341
342    /// This invokes the [cleanup_flush_stdout] and [cleanup_flush_stderr] hooks in the correct order
343    /// (stderr is flushed first as it's likely to be most important).
344    pub(super) extern "C" fn cleanup() {
345        cleanup_flush_stderr();
346        cleanup_flush_stdout();
347    }
348}
349
350/// [`std::sync::LazyLock`] that registers the cleanup callbacks for [STDOUT] and [STDERR].
351///
352/// The actual value is a boolean indicating if it succeeded in registration or not.
353static ASYNC_STDIO_CLEANUP_REGISTERED: LazyLock<bool> = LazyLock::new(|| {
354    // SAFETY - cleanup functions include internal synchronisation to ensure cleanup does not occur
355    // more than once, even if libc ends up doing weird things.
356    unsafe { libc::atexit(cleanup::cleanup) == 0 }
357});
358
359static STDOUT: OnceLock<MutexUnblock<Unblock<io::Stdout>>> = OnceLock::new();
360
361/// Get the synchronised [`blocking::Unblock`]-ed stdout. A best-effort attempt is made to flush
362/// the asynchronous standard output upon program exit (including things stored in the pipe it uses
363/// to do async things).
364///
365/// The returned handle cannot be locked in a re-entrant fashion while the lock is held, within a
366/// future/task that is being waited for. If you try, it will cause a deadlock.
367#[must_use]
368pub fn stdout() -> StdoutUnblock {
369    MutexUnblockHandle(STDOUT.get_or_init(|| {
370        let r = MutexUnblock(SeqMutex::new(Unblock::new(io::stdout())));
371        if !(*ASYNC_STDIO_CLEANUP_REGISTERED) {
372            panic!("could not register async stdio cleanup functions")
373        }
374        r
375    }))
376}
377
378static STDERR: OnceLock<MutexUnblock<Unblock<io::Stderr>>> = OnceLock::new();
379
380/// Get the synchronised [`blocking::Unblock`]-ed stderr. A best-effort attempt is made to flush
381/// the asynchronous standard error upon program exit (including things stored in the pipe it uses
382/// to do async things).
383///
384/// The returned handle cannot be locked in a re-entrant fashion while the lock is held, within a
385/// future/task that is being waited for. If you try, it will cause a deadlock.
386#[must_use]
387pub fn stderr() -> StderrUnblock {
388    MutexUnblockHandle(STDERR.get_or_init(|| {
389        let r = MutexUnblock(SeqMutex::new(Unblock::new(io::stderr())));
390        if !(*ASYNC_STDIO_CLEANUP_REGISTERED) {
391            panic!("could not register async stdio cleanup functions");
392        }
393        r
394    }))
395}
396
397static STDIN: OnceLock<MutexUnblock<BufReader<Unblock<io::Stdin>>>> = OnceLock::new();
398
399/// Get the synchronised [`blocking::Unblock`]-ed stdin
400///
401/// The returned handle cannot be locked in a re-entrant fashion while the lock is held, within a
402/// future/task that is being waited for. If you try, it will cause a deadlock.
403#[must_use]
404pub fn stdin() -> StdinUnblock {
405    MutexUnblockHandle(
406        STDIN
407            .get_or_init(|| MutexUnblock(SeqMutex::new(BufReader::new(Unblock::new(io::stdin()))))),
408    )
409}
410
411impl<T: ?Sized> MutexUnblockGuard<'_, T> {
412    /// Get the pinned form of the stored, synchronised value from this guard structure.
413    ///
414    /// This is not public, such as to avoid leaking `T`
415    #[inline]
416    pub(crate) fn inner_pinned(self: Pin<&mut Self>) -> Pin<&mut T>
417    where
418        T: Unpin,
419    {
420        Pin::new(self.project().inner.get_mut())
421    }
422}
423
424// MANUAL LOCK IMPLS
425impl<T: AsyncRead + Unpin + ?Sized> AsyncRead for MutexUnblockGuard<'_, T> {
426    #[inline]
427    fn poll_read(
428        self: std::pin::Pin<&mut Self>,
429        cx: &mut std::task::Context<'_>,
430        buf: &mut [u8],
431    ) -> std::task::Poll<io::Result<usize>> {
432        self.inner_pinned().poll_read(cx, buf)
433    }
434
435    #[inline]
436    fn poll_read_vectored(
437        self: std::pin::Pin<&mut Self>,
438        cx: &mut std::task::Context<'_>,
439        bufs: &mut [io::IoSliceMut<'_>],
440    ) -> std::task::Poll<io::Result<usize>> {
441        self.inner_pinned().poll_read_vectored(cx, bufs)
442    }
443}
444
445impl<T: AsyncBufRead + Unpin + ?Sized> AsyncBufRead for MutexUnblockGuard<'_, T> {
446    #[inline]
447    fn poll_fill_buf(
448        self: Pin<&mut Self>,
449        cx: &mut std::task::Context<'_>,
450    ) -> std::task::Poll<io::Result<&[u8]>> {
451        self.inner_pinned().poll_fill_buf(cx)
452    }
453
454    #[inline]
455    fn consume(self: Pin<&mut Self>, amt: usize) {
456        self.inner_pinned().consume(amt)
457    }
458}
459
460impl<T: AsyncSeek + Unpin + ?Sized> AsyncSeek for MutexUnblockGuard<'_, T> {
461    #[inline]
462    fn poll_seek(
463        self: Pin<&mut Self>,
464        cx: &mut std::task::Context<'_>,
465        pos: io::SeekFrom,
466    ) -> std::task::Poll<io::Result<u64>> {
467        self.inner_pinned().poll_seek(cx, pos)
468    }
469}
470
471impl<T: AsyncWrite + Unpin + ?Sized> AsyncWrite for MutexUnblockGuard<'_, T> {
472    #[inline]
473    fn poll_write(
474        self: Pin<&mut Self>,
475        cx: &mut std::task::Context<'_>,
476        buf: &[u8],
477    ) -> std::task::Poll<io::Result<usize>> {
478        self.inner_pinned().poll_write(cx, buf)
479    }
480
481    #[inline]
482    fn poll_flush(
483        self: Pin<&mut Self>,
484        cx: &mut std::task::Context<'_>,
485    ) -> std::task::Poll<io::Result<()>> {
486        self.inner_pinned().poll_flush(cx)
487    }
488
489    #[inline]
490    fn poll_close(
491        self: Pin<&mut Self>,
492        cx: &mut std::task::Context<'_>,
493    ) -> std::task::Poll<io::Result<()>> {
494        self.inner_pinned().poll_close(cx)
495    }
496
497    #[inline]
498    fn poll_write_vectored(
499        self: Pin<&mut Self>,
500        cx: &mut std::task::Context<'_>,
501        bufs: &[io::IoSlice<'_>],
502    ) -> std::task::Poll<io::Result<usize>> {
503        self.inner_pinned().poll_write_vectored(cx, bufs)
504    }
505}
506
507impl<T: Stream + Unpin + ?Sized> Stream for MutexUnblockGuard<'_, T> {
508    type Item = T::Item;
509
510    #[inline]
511    fn poll_next(
512        self: Pin<&mut Self>,
513        cx: &mut std::task::Context<'_>,
514    ) -> std::task::Poll<Option<Self::Item>> {
515        self.inner_pinned().poll_next(cx)
516    }
517
518    #[inline]
519    fn size_hint(&self) -> (usize, Option<usize>) {
520        self.inner.size_hint()
521    }
522}
523
524// async-blocking-stdio - std::io::std{in(), out(), err()}, but async
525// Copyright (C) 2024  Matti Bryce <mattibryce at protonmail dot com>
526//
527// This program is free software: you can redistribute it and/or modify
528// it under the terms of the GNU General Public License as published by
529// the Free Software Foundation, either version 3 of the License, or
530// (at your option) any later version.
531//
532// This program is distributed in the hope that it will be useful,
533// but WITHOUT ANY WARRANTY; without even the implied warranty of
534// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
535// GNU General Public License for more details.
536//
537// You should have received a copy of the GNU General Public License
538// along with this program.  If not, see <https://www.gnu.org/licenses/>.