async_blocking_stdio/lib.rs
1//! [stdio]: `std::io`
2//!
3//! [stdstdout]: `std::io::Stdout`
4//! [stdstderr]: `std::io::Stderr`
5//! [stdstdin]: `std::io::Stdin`
6//!
7//! [stdstdoutlock]: `std::io::StdoutLock`
8//! [stdstderrlock]: `std::io::StderrLock`
9//! [stdstdinlock]: `std::io::StdinLock`
10//!
11//! [send-trait]: `core::marker::Send`
12//!
13//! [blocking-unblock]: `blocking::Unblock`
14//! [fl-block-on]: `futures_lite::io::BlockOn`
15//! [fl-io]: `futures_lite::io`
16//!
17//! [selfstdoutstruct]: `StdoutUnblock`
18//! [selfstderrstruct]: `StderrUnblock`
19//! [selfstdinstruct]: `StdinUnblock`
20//!
21//! [selfstdoutfn]: `stdout`
22//! [selfstderrfn]: `stderr`
23//! [selfstdinfn]: `stdin`
24#![doc = include_str!("../README.md")]
25use blocking::Unblock;
26use core::pin::Pin;
27use futures_lite::{io::BufReader, AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, Stream};
28use std::{
29 io,
30 sync::{LazyLock, OnceLock},
31};
32
33mod sealed {
34 use blocking::Unblock;
35 use futures_lite::io::BufReader;
36
37 /// A [`std::io`] standard io stream.
38 pub trait StdioSealed {
39 /// Inner implementation on how, exactly, to actually unblock the stream.
40 ///
41 /// Some streams require extra wrappers to properly unblock them while retaining
42 /// full efficiency - this provides that.
43 type Unblocked;
44 }
45
46 impl StdioSealed for std::io::Stdin {
47 /// We need [`BufReader`] here - [`Unblock`] contains no implementation of [`BufReader`]
48 /// because it would need to implement special operation modes. To get back the buffering,
49 /// we use a [`BufReader`] - the async form - outside it.
50 type Unblocked = BufReader<Unblock<Self>>;
51 }
52 impl StdioSealed for std::io::Stdout {
53 type Unblocked = Unblock<Self>;
54 }
55 impl StdioSealed for std::io::Stderr {
56 type Unblocked = Unblock<Self>;
57 }
58}
59
60/// Extension trait that provides easy access to the corresponding asynchronous types for the
61/// streams in [`std::io`]
62pub trait StdioExt: sealed::StdioSealed {
63 /// Handle to the async version of the standard io stream.
64 type Handle;
65
66 /// Lock on the async handle to this async-wrapped/unblocked standard io stream.
67 type Locked<'lt>;
68
69 /// Lock on the async handle to this async-wrapped/unblocked standard io stream, wrapped again
70 /// so that the synchronous traits from [`std::io`] can be used on it.
71 type LockedAndSync<'lt>;
72
73 /// Get the handle to the global asynchronous version of this standard io stream as provided by
74 /// [`async_blocking_stdio`][crate].
75 fn async_handle() -> Self::Handle
76 where
77 Self::Handle: Sized;
78}
79
80/// Prelude module - includes the [`crate::StdioExt`] trait, mostly.
81pub mod prelude {
82 pub use super::StdioExt as _;
83}
84
85type SeqMutex<T> = async_lock::Mutex<T>;
86type SeqMutexGuard<'lt, T> = async_lock::MutexGuard<'lt, T>;
87
88/// Inner storage type that is stored exactly once. The type parameter is actually the full
89/// representation including any Unblock or BufReaders
90#[derive(Debug)]
91struct MutexUnblock<T: ?Sized>(pub SeqMutex<T>);
92
93impl<T: ?Sized> MutexUnblock<T> {
94 /// Lock it in an async context
95 #[inline]
96 pub async fn lock(&self) -> MutexUnblockGuard<'_, T> {
97 MutexUnblockGuard::new(self.0.lock().await)
98 }
99
100 /// Lock it in a sync context - do not use in async context or risk deadlocks!
101 #[inline]
102 pub fn lock_blocking(&self) -> MutexUnblockGuard<'_, T> {
103 MutexUnblockGuard::new(self.0.lock_blocking())
104 }
105
106 /// Attempt to lock it if available
107 #[inline]
108 pub fn try_lock(&self) -> Option<MutexUnblockGuard<'_, T>> {
109 self.0.try_lock().map(MutexUnblockGuard::new)
110 }
111}
112
113/// Wrapper type for making an async-locked [`blocking::Unblock`] Stdio stream. Parameter is one of
114/// the [`io::Stdout`]/[`io::Stderr`]/[`io::Stdin`] types plus any [`blocking::Unblock`] or other wrappers.
115/// This holds a static reference to an internal shared mutex. Some things may need outer
116/// BufRead/BufWrite attachments to make it match the equivalent sync trait impls - this is done as
117/// much as possible to match the stdio inside the stdlib.
118///
119/// Unlike the [`std::io`] unlocked-handles, this does not implement the various [`futures_lite::io`]
120/// traits. You need to use [`MutexUnblockHandle::lock`] or other locking methods to get access to a
121/// handle which can be used with these traits. Unlike the Standard Library, these handles are not
122/// re-entrant, which means that attempting to lock them in some inner future while the outer lock
123/// is held will cause a deadlock.
124///
125/// An important note is that [`Unblock`] implements [`Unpin`] unconditionally. This makes it easy to
126/// make generic implementations that will work nicely.
127#[derive(Debug)]
128pub struct MutexUnblockHandle<T: 'static + ?Sized>(&'static MutexUnblock<T>);
129
130impl<T: ?Sized> MutexUnblockHandle<T> {
131 /// Locks this handle asynchronously.
132 #[inline]
133 pub async fn lock(&self) -> MutexUnblockGuard<'static, T> {
134 self.0.lock().await
135 }
136
137 /// Lock this handle, but produce something that implements the synchronous [`std::io`]
138 /// traits.
139 #[inline]
140 pub async fn lock_into_sync(&self) -> futures_lite::io::BlockOn<MutexUnblockGuard<'static, T>> {
141 futures_lite::io::BlockOn::new(self.lock().await)
142 }
143
144 /// Lock this handle in a sync context. Do not call in an async context or risk deadlocks!.
145 #[inline]
146 pub fn lock_blocking(&self) -> MutexUnblockGuard<'static, T> {
147 self.0.lock_blocking()
148 }
149
150 /// Lock this handle in a sync context, but producing something that implements the
151 /// synchronous [`std::io`] traits.
152 ///
153 /// Do not call in an async context or risk deadlocks!.
154 #[inline]
155 pub fn lock_blocking_into_sync(
156 &self,
157 ) -> futures_lite::io::BlockOn<MutexUnblockGuard<'static, T>> {
158 futures_lite::io::BlockOn::new(self.lock_blocking())
159 }
160
161 /// Attempt to lock this handle if you can
162 #[inline]
163 pub fn try_lock(&self) -> Option<MutexUnblockGuard<'static, T>> {
164 self.0.try_lock()
165 }
166
167 /// Attempt to lock this handle and wrap it in something that can be used with the synchronous
168 /// [`std::io`] traits.
169 #[inline]
170 pub fn try_lock_into_sync(
171 &self,
172 ) -> Option<futures_lite::io::BlockOn<MutexUnblockGuard<'static, T>>> {
173 self.try_lock().map(futures_lite::io::BlockOn::new)
174 }
175}
176
177/// Handle to the globally-synchronised structure for async standard input.
178pub type StdinUnblock = <io::Stdin as StdioExt>::Handle;
179
180/// Handle to the globally-synchronised structure for async standard output.
181pub type StdoutUnblock = <io::Stdout as StdioExt>::Handle;
182
183/// Handle to the globally-synchronised structure for async standard error.
184pub type StderrUnblock = <io::Stderr as StdioExt>::Handle;
185
186pin_project_lite::pin_project! {
187 /// Wrapper type indicating a held async-locked [`blocking::Unblock`] Stdio stream. Parameter is
188 /// the [`io::Stdout`]/[`io::Stderr`]/[`io::Stdin`] types.
189 ///
190 /// Using the various [`futures_lite::io`] traits on values of this type has all the same caveats
191 /// of using them on a [`blocking::Unblock`].
192 ///
193 /// This is not re-entrant. This means that if you hold this lock, while waiting for another
194 /// access to the same asynchronous-friendly mutex to be locked (e.g. if you create a future by
195 /// calling an async function, that waits for the corresponding mutex to be unlocked, and then
196 /// wait for that future to complete while holding the lock), it will cause a deadlock.
197 ///
198 /// This also deliberately does not expose any sort of `Deref` implementation, to avoid
199 /// exposing the internals of the type for public consumption.
200 #[derive(Debug)]
201 #[clippy::has_significant_drop]
202 pub struct MutexUnblockGuard<'lt, T: ?Sized> {
203 // This uses pin_project - the inner mutex guard needs to be pinned to access the internals in a
204 // pinned way.
205 #[pin] inner: SeqMutexGuard<'lt, T>
206 }
207}
208
209impl<'lt, T: ?Sized> MutexUnblockGuard<'lt, T> {
210 /// Internal construction function - not exposed as `From` because this is not a public API
211 #[inline]
212 fn new(inner: SeqMutexGuard<'lt, T>) -> Self {
213 Self { inner }
214 }
215}
216
217/// Lock on the global asynchronous accessor for Standard Input
218///
219/// This lock is not reentrant. If you hold this lock, and then wait for something else that needs
220/// to wait to lock this, then it will cause a deadlock.
221pub type StdinUnblockLock<'lt> = <std::io::Stdin as StdioExt>::Locked<'lt>;
222
223/// Lock on the global asynchronous accessor for Standard Output
224///
225/// This lock is not reentrant. If you hold this lock, and then wait for something else that needs
226/// to wait to lock this, then it will cause a deadlock.
227pub type StdoutUnblockLock<'lt> = <std::io::Stdout as StdioExt>::Locked<'lt>;
228
229/// Lock on the global asynchronous accessor for Standard Error
230///
231/// This lock is not reentrant. If you hold this lock, and then wait for something else that needs
232/// to wait to lock this, then it will cause a deadlock.
233pub type StderrUnblockLock<'lt> = <std::io::Stderr as StdioExt>::Locked<'lt>;
234
235impl StdioExt for std::io::Stdout {
236 type Handle = MutexUnblockHandle<Self::Unblocked>;
237
238 type Locked<'lt> = MutexUnblockGuard<'lt, Self::Unblocked>;
239
240 type LockedAndSync<'lt> = futures_lite::io::BlockOn<Self::Locked<'static>>;
241
242 #[inline]
243 fn async_handle() -> Self::Handle
244 where
245 Self::Handle: Sized,
246 {
247 stdout()
248 }
249}
250
251impl StdioExt for std::io::Stderr {
252 type Handle = MutexUnblockHandle<Self::Unblocked>;
253
254 type Locked<'lt> = MutexUnblockGuard<'lt, Self::Unblocked>;
255
256 type LockedAndSync<'lt> = futures_lite::io::BlockOn<Self::Locked<'static>>;
257
258 #[inline]
259 fn async_handle() -> Self::Handle
260 where
261 Self::Handle: Sized,
262 {
263 stderr()
264 }
265}
266
267impl StdioExt for std::io::Stdin {
268 type Handle = MutexUnblockHandle<Self::Unblocked>;
269
270 type Locked<'lt> = MutexUnblockGuard<'lt, Self::Unblocked>;
271
272 type LockedAndSync<'lt> = futures_lite::io::BlockOn<Self::Locked<'static>>;
273
274 #[inline]
275 fn async_handle() -> Self::Handle
276 where
277 Self::Handle: Sized,
278 {
279 stdin()
280 }
281}
282
283pub(crate) mod cleanup {
284 use futures_lite::AsyncWriteExt as _;
285 use std::{io, sync};
286
287 use crate::{STDERR, STDOUT};
288 /// If [STDOUT] is initialised, this flushes the stream as a cleanup mechanism. This is important
289 /// because [Unblock] has an internal buffer separate from the inner stdout (which has a similar
290 /// cleanup mechanism).
291 ///
292 /// This function ensures flushing occurs no more than once, even if called multiple times.
293 fn cleanup_flush_stdout() {
294 static FLUSH_ONCE: sync::Once = sync::Once::new();
295 // It's important we use .get() here rather than simple deref, because if we did that, and
296 // the program exited during initialization, it could cause infinite loops, or deadlock, or
297 // something like that (as then it would try to initialize again).
298 if let Some(init_stdout) = STDOUT.get() {
299 FLUSH_ONCE.call_once(|| {
300 let mut init_stdout = init_stdout.lock_blocking();
301
302 let _result = nolocal_block_on::block_on(async {
303 loop {
304 match init_stdout.flush().await {
305 Ok(()) => break Ok(()),
306 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
307 Err(e) => break Err(e),
308 }
309 }
310 });
311 })
312 }
313 }
314
315 /// If [STDERR] is initialised, this flushes the stream as a cleanup mechanism. This is important
316 /// because [Unblock] has an internal buffer separate from the inner stderr. The inner stderr
317 /// itself is not buffered, though.
318 ///
319 /// This function ensures flushing occurs no more than once, even if called multiple times.
320 fn cleanup_flush_stderr() {
321 static FLUSH_ONCE: sync::Once = sync::Once::new();
322 // It's important we use .get() here rather than simple deref, because if we did that, and
323 // the program exited during initialization, it could cause infinite loops, or deadlock, or
324 // something like that (as then it would try to initialize again).
325 if let Some(init_stderr) = STDERR.get() {
326 FLUSH_ONCE.call_once(|| {
327 let mut init_stderr = init_stderr.lock_blocking();
328
329 let _result = nolocal_block_on::block_on(async {
330 loop {
331 match init_stderr.flush().await {
332 Ok(()) => break Ok(()),
333 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
334 Err(e) => break Err(e),
335 }
336 }
337 });
338 })
339 }
340 }
341
342 /// This invokes the [cleanup_flush_stdout] and [cleanup_flush_stderr] hooks in the correct order
343 /// (stderr is flushed first as it's likely to be most important).
344 pub(super) extern "C" fn cleanup() {
345 cleanup_flush_stderr();
346 cleanup_flush_stdout();
347 }
348}
349
350/// [`std::sync::LazyLock`] that registers the cleanup callbacks for [STDOUT] and [STDERR].
351///
352/// The actual value is a boolean indicating if it succeeded in registration or not.
353static ASYNC_STDIO_CLEANUP_REGISTERED: LazyLock<bool> = LazyLock::new(|| {
354 // SAFETY - cleanup functions include internal synchronisation to ensure cleanup does not occur
355 // more than once, even if libc ends up doing weird things.
356 unsafe { libc::atexit(cleanup::cleanup) == 0 }
357});
358
359static STDOUT: OnceLock<MutexUnblock<Unblock<io::Stdout>>> = OnceLock::new();
360
361/// Get the synchronised [`blocking::Unblock`]-ed stdout. A best-effort attempt is made to flush
362/// the asynchronous standard output upon program exit (including things stored in the pipe it uses
363/// to do async things).
364///
365/// The returned handle cannot be locked in a re-entrant fashion while the lock is held, within a
366/// future/task that is being waited for. If you try, it will cause a deadlock.
367#[must_use]
368pub fn stdout() -> StdoutUnblock {
369 MutexUnblockHandle(STDOUT.get_or_init(|| {
370 let r = MutexUnblock(SeqMutex::new(Unblock::new(io::stdout())));
371 if !(*ASYNC_STDIO_CLEANUP_REGISTERED) {
372 panic!("could not register async stdio cleanup functions")
373 }
374 r
375 }))
376}
377
378static STDERR: OnceLock<MutexUnblock<Unblock<io::Stderr>>> = OnceLock::new();
379
380/// Get the synchronised [`blocking::Unblock`]-ed stderr. A best-effort attempt is made to flush
381/// the asynchronous standard error upon program exit (including things stored in the pipe it uses
382/// to do async things).
383///
384/// The returned handle cannot be locked in a re-entrant fashion while the lock is held, within a
385/// future/task that is being waited for. If you try, it will cause a deadlock.
386#[must_use]
387pub fn stderr() -> StderrUnblock {
388 MutexUnblockHandle(STDERR.get_or_init(|| {
389 let r = MutexUnblock(SeqMutex::new(Unblock::new(io::stderr())));
390 if !(*ASYNC_STDIO_CLEANUP_REGISTERED) {
391 panic!("could not register async stdio cleanup functions");
392 }
393 r
394 }))
395}
396
397static STDIN: OnceLock<MutexUnblock<BufReader<Unblock<io::Stdin>>>> = OnceLock::new();
398
399/// Get the synchronised [`blocking::Unblock`]-ed stdin
400///
401/// The returned handle cannot be locked in a re-entrant fashion while the lock is held, within a
402/// future/task that is being waited for. If you try, it will cause a deadlock.
403#[must_use]
404pub fn stdin() -> StdinUnblock {
405 MutexUnblockHandle(
406 STDIN
407 .get_or_init(|| MutexUnblock(SeqMutex::new(BufReader::new(Unblock::new(io::stdin()))))),
408 )
409}
410
411impl<T: ?Sized> MutexUnblockGuard<'_, T> {
412 /// Get the pinned form of the stored, synchronised value from this guard structure.
413 ///
414 /// This is not public, such as to avoid leaking `T`
415 #[inline]
416 pub(crate) fn inner_pinned(self: Pin<&mut Self>) -> Pin<&mut T>
417 where
418 T: Unpin,
419 {
420 Pin::new(self.project().inner.get_mut())
421 }
422}
423
424// MANUAL LOCK IMPLS
425impl<T: AsyncRead + Unpin + ?Sized> AsyncRead for MutexUnblockGuard<'_, T> {
426 #[inline]
427 fn poll_read(
428 self: std::pin::Pin<&mut Self>,
429 cx: &mut std::task::Context<'_>,
430 buf: &mut [u8],
431 ) -> std::task::Poll<io::Result<usize>> {
432 self.inner_pinned().poll_read(cx, buf)
433 }
434
435 #[inline]
436 fn poll_read_vectored(
437 self: std::pin::Pin<&mut Self>,
438 cx: &mut std::task::Context<'_>,
439 bufs: &mut [io::IoSliceMut<'_>],
440 ) -> std::task::Poll<io::Result<usize>> {
441 self.inner_pinned().poll_read_vectored(cx, bufs)
442 }
443}
444
445impl<T: AsyncBufRead + Unpin + ?Sized> AsyncBufRead for MutexUnblockGuard<'_, T> {
446 #[inline]
447 fn poll_fill_buf(
448 self: Pin<&mut Self>,
449 cx: &mut std::task::Context<'_>,
450 ) -> std::task::Poll<io::Result<&[u8]>> {
451 self.inner_pinned().poll_fill_buf(cx)
452 }
453
454 #[inline]
455 fn consume(self: Pin<&mut Self>, amt: usize) {
456 self.inner_pinned().consume(amt)
457 }
458}
459
460impl<T: AsyncSeek + Unpin + ?Sized> AsyncSeek for MutexUnblockGuard<'_, T> {
461 #[inline]
462 fn poll_seek(
463 self: Pin<&mut Self>,
464 cx: &mut std::task::Context<'_>,
465 pos: io::SeekFrom,
466 ) -> std::task::Poll<io::Result<u64>> {
467 self.inner_pinned().poll_seek(cx, pos)
468 }
469}
470
471impl<T: AsyncWrite + Unpin + ?Sized> AsyncWrite for MutexUnblockGuard<'_, T> {
472 #[inline]
473 fn poll_write(
474 self: Pin<&mut Self>,
475 cx: &mut std::task::Context<'_>,
476 buf: &[u8],
477 ) -> std::task::Poll<io::Result<usize>> {
478 self.inner_pinned().poll_write(cx, buf)
479 }
480
481 #[inline]
482 fn poll_flush(
483 self: Pin<&mut Self>,
484 cx: &mut std::task::Context<'_>,
485 ) -> std::task::Poll<io::Result<()>> {
486 self.inner_pinned().poll_flush(cx)
487 }
488
489 #[inline]
490 fn poll_close(
491 self: Pin<&mut Self>,
492 cx: &mut std::task::Context<'_>,
493 ) -> std::task::Poll<io::Result<()>> {
494 self.inner_pinned().poll_close(cx)
495 }
496
497 #[inline]
498 fn poll_write_vectored(
499 self: Pin<&mut Self>,
500 cx: &mut std::task::Context<'_>,
501 bufs: &[io::IoSlice<'_>],
502 ) -> std::task::Poll<io::Result<usize>> {
503 self.inner_pinned().poll_write_vectored(cx, bufs)
504 }
505}
506
507impl<T: Stream + Unpin + ?Sized> Stream for MutexUnblockGuard<'_, T> {
508 type Item = T::Item;
509
510 #[inline]
511 fn poll_next(
512 self: Pin<&mut Self>,
513 cx: &mut std::task::Context<'_>,
514 ) -> std::task::Poll<Option<Self::Item>> {
515 self.inner_pinned().poll_next(cx)
516 }
517
518 #[inline]
519 fn size_hint(&self) -> (usize, Option<usize>) {
520 self.inner.size_hint()
521 }
522}
523
524// async-blocking-stdio - std::io::std{in(), out(), err()}, but async
525// Copyright (C) 2024 Matti Bryce <mattibryce at protonmail dot com>
526//
527// This program is free software: you can redistribute it and/or modify
528// it under the terms of the GNU General Public License as published by
529// the Free Software Foundation, either version 3 of the License, or
530// (at your option) any later version.
531//
532// This program is distributed in the hope that it will be useful,
533// but WITHOUT ANY WARRANTY; without even the implied warranty of
534// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
535// GNU General Public License for more details.
536//
537// You should have received a copy of the GNU General Public License
538// along with this program. If not, see <https://www.gnu.org/licenses/>.