1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
//! [stdio]: `std::io`
//!
//! [stdstdout]: `std::io::Stdout`
//! [stdstderr]: `std::io::Stderr`
//! [stdstdin]: `std::io::Stdin`
//!
//! [stdstdoutlock]: `std::io::StdoutLock`
//! [stdstderrlock]: `std::io::StderrLock`
//! [stdstdinlock]: `std::io::StdinLock`
//!
//! [send-trait]: `core::marker::Send`
//!
//! [blocking-unblock]: `blocking::Unblock`
//! [fl-block-on]: `futures_lite::io::BlockOn`
//! [fl-io]: `futures_lite::io`
//!
//! [selfstdoutstruct]: `StdoutUnblock`
//! [selfstderrstruct]: `StderrUnblock`
//! [selfstdinstruct]: `StdinUnblock`
//!
//! [selfstdoutfn]: `stdout`
//! [selfstderrfn]: `stderr`
//! [selfstdinfn]: `stdin`
#![doc = include_str!("../README.md")]
use blocking::Unblock;
use core::pin::Pin;
use futures_lite::{io::BufReader, AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, Stream};
use std::{
    io,
    sync::{LazyLock, OnceLock},
};

mod sealed {
    use blocking::Unblock;
    use futures_lite::io::BufReader;

    /// A [`std::io`] standard io stream.
    pub trait StdioSealed {
        /// Inner implementation on how, exactly, to actually unblock the stream.
        ///
        /// Some streams require extra wrappers to properly unblock them while retaining
        /// full efficiency - this provides that.
        type Unblocked;
    }

    impl StdioSealed for std::io::Stdin {
        /// We need [`BufReader`] here - [`Unblock`] contains no implementation of [`BufReader`]
        /// because it would need to implement special operation modes. To get back the buffering,
        /// we use a [`BufReader`] - the async form - outside it.
        type Unblocked = BufReader<Unblock<Self>>;
    }
    impl StdioSealed for std::io::Stdout {
        type Unblocked = Unblock<Self>;
    }
    impl StdioSealed for std::io::Stderr {
        type Unblocked = Unblock<Self>;
    }
}

/// Extension trait that provides easy access to the corresponding asynchronous types for the
/// streams in [`std::io`]
pub trait StdioExt: sealed::StdioSealed {
    /// Handle to the async version of the standard io stream.
    type Handle;

    /// Lock on the async handle to this async-wrapped/unblocked standard io stream.
    type Locked<'lt>;

    /// Lock on the async handle to this async-wrapped/unblocked standard io stream, wrapped again
    /// so that the synchronous traits from [`std::io`] can be used on it.
    type LockedAndSync<'lt>;

    /// Get the handle to the global asynchronous version of this standard io stream as provided by
    /// [`async_blocking_stdio`][crate].
    fn async_handle() -> Self::Handle
    where
        Self::Handle: Sized;
}

/// Prelude module - includes the [`super::StdioExt`] trait, mostly.
pub mod prelude {
    pub use super::StdioExt as _;
}

type SeqMutex<T> = async_lock::Mutex<T>;
type SeqMutexGuard<'lt, T> = async_lock::MutexGuard<'lt, T>;

/// Inner storage type that is stored exactly once. The type parameter is actually the full
/// representation including any Unblock or BufReaders
#[derive(Debug)]
struct MutexUnblock<T: ?Sized>(pub SeqMutex<T>);

impl<T: ?Sized> MutexUnblock<T> {
    /// Lock it in an async context
    #[inline]
    pub async fn lock(&self) -> MutexUnblockGuard<'_, T> {
        MutexUnblockGuard::new(self.0.lock().await)
    }

    /// Lock it in a sync context - do not use in async context or risk deadlocks!
    #[inline]
    pub fn lock_blocking(&self) -> MutexUnblockGuard<'_, T> {
        MutexUnblockGuard::new(self.0.lock_blocking())
    }

    /// Attempt to lock it if available
    #[inline]
    pub fn try_lock(&self) -> Option<MutexUnblockGuard<'_, T>> {
        self.0.try_lock().map(MutexUnblockGuard::new)
    }
}

/// Wrapper type for making an async-locked [`blocking::Unblock`] Stdio stream. Parameter is one of
/// the [`io::Stdout`]/[`io::Stderr`]/[`io::Stdin`] types plus any [`blocking::Unblock`] or other wrappers.
/// This holds a static reference to an internal shared mutex. Some things may need outer
/// BufRead/BufWrite attachments to make it match the equivalent sync trait impls - this is done as
/// much as possible to match the stdio inside the stdlib.
///
/// Unlike the [`std::io`] unlocked-handles, this does not implement the various [`futures_lite::io`]
/// traits. You need to use [`MutexUnblockHandle::lock`] or other locking methods to get access to a
/// handle which can be used with these traits. Unlike the Standard Library, these handles are not
/// re-entrant, which means that attempting to lock them in some inner future while the outer lock
/// is held will cause a deadlock.
///
/// An important note is that [`Unblock`] implements [`Unpin`] unconditionally. This makes it easy to
/// make generic implementations that will work nicely.
#[derive(Debug)]
pub struct MutexUnblockHandle<T: 'static + ?Sized>(&'static MutexUnblock<T>);

impl<T: ?Sized> MutexUnblockHandle<T> {
    /// Locks this handle asynchronously.
    #[inline]
    pub async fn lock(&self) -> MutexUnblockGuard<'static, T> {
        self.0.lock().await
    }

    /// Lock this handle, but produce something that implements the synchronous [`std::io`]
    /// traits.
    #[inline]
    pub async fn lock_into_sync(&self) -> futures_lite::io::BlockOn<MutexUnblockGuard<'static, T>> {
        futures_lite::io::BlockOn::new(self.lock().await)
    }

    /// Lock this handle in a sync context. Do not call in an async context or risk deadlocks!.
    #[inline]
    pub fn lock_blocking(&self) -> MutexUnblockGuard<'static, T> {
        self.0.lock_blocking()
    }

    /// Lock this handle in a sync context, but producing something that implements the
    /// synchronous [`std::io`] traits.
    ///
    /// Do not call in an async context or risk deadlocks!.
    #[inline]
    pub fn lock_blocking_into_sync(
        &self,
    ) -> futures_lite::io::BlockOn<MutexUnblockGuard<'static, T>> {
        futures_lite::io::BlockOn::new(self.lock_blocking())
    }

    /// Attempt to lock this handle if you can
    #[inline]
    pub fn try_lock(&self) -> Option<MutexUnblockGuard<'static, T>> {
        self.0.try_lock()
    }

    /// Attempt to lock this handle and wrap it in something that can be used with the synchronous
    /// [`std::io`] traits.
    #[inline]
    pub fn try_lock_into_sync(
        &self,
    ) -> Option<futures_lite::io::BlockOn<MutexUnblockGuard<'static, T>>> {
        self.try_lock().map(futures_lite::io::BlockOn::new)
    }
}

/// Handle to the globally-synchronised structure for async standard input.
pub type StdinUnblock = <io::Stdin as StdioExt>::Handle;

/// Handle to the globally-synchronised structure for async standard output.
pub type StdoutUnblock = <io::Stdout as StdioExt>::Handle;

/// Handle to the globally-synchronised structure for async standard error.
pub type StderrUnblock = <io::Stderr as StdioExt>::Handle;

pin_project_lite::pin_project! {
    /// Wrapper type indicating a held async-locked [`blocking::Unblock`] Stdio stream. Parameter is
    /// the [`io::Stdout`]/[`io::Stderr`]/[`io::Stdin`] types.
    ///
    /// Using the various [`futures_lite::io`] traits on values of this type has all the same caveats
    /// of using them on a [`blocking::Unblock`].
    ///
    /// This is not re-entrant. This means that if you hold this lock, while waiting for another
    /// access to the same asynchronous-friendly mutex to be locked (e.g. if you create a future by
    /// calling an async function, that waits for the corresponding mutex to be unlocked, and then
    /// wait for that future to complete while holding the lock), it will cause a deadlock.
    ///
    /// This also deliberately does not expose any sort of `Deref` implementation, to avoid
    /// exposing the internals of the type for public consumption. 
    #[derive(Debug)]
    #[clippy::has_significant_drop]
    pub struct MutexUnblockGuard<'lt, T: ?Sized> {
        // This uses pin_project - the inner mutex guard needs to be pinned to access the internals in a
        // pinned way.
        #[pin] inner: SeqMutexGuard<'lt, T>
    }
}

impl<'lt, T: ?Sized> MutexUnblockGuard<'lt, T> {
    /// Internal construction function - not exposed as `From` because this is not a public API
    #[inline]
    fn new(inner: SeqMutexGuard<'lt, T>) -> Self {
        Self { inner }
    }
}

/// Lock on the global asynchronous accessor for Standard Input
///
/// This lock is not reentrant. If you hold this lock, and then wait for something else that needs
/// to wait to lock this, then it will cause a deadlock.
pub type StdinUnblockLock<'lt> = <std::io::Stdin as StdioExt>::Locked<'lt>;

/// Lock on the global asynchronous accessor for Standard Output
///
/// This lock is not reentrant. If you hold this lock, and then wait for something else that needs
/// to wait to lock this, then it will cause a deadlock.
pub type StdoutUnblockLock<'lt> = <std::io::Stdout as StdioExt>::Locked<'lt>;

/// Lock on the global asynchronous accessor for Standard Error
///
/// This lock is not reentrant. If you hold this lock, and then wait for something else that needs
/// to wait to lock this, then it will cause a deadlock.
pub type StderrUnblockLock<'lt> = <std::io::Stderr as StdioExt>::Locked<'lt>;

impl StdioExt for std::io::Stdout {
    type Handle = MutexUnblockHandle<Self::Unblocked>;

    type Locked<'lt> = MutexUnblockGuard<'lt, Self::Unblocked>;

    type LockedAndSync<'lt> = futures_lite::io::BlockOn<Self::Locked<'static>>;

    #[inline]
    fn async_handle() -> Self::Handle
    where
        Self::Handle: Sized,
    {
        stdout()
    }
}

impl StdioExt for std::io::Stderr {
    type Handle = MutexUnblockHandle<Self::Unblocked>;

    type Locked<'lt> = MutexUnblockGuard<'lt, Self::Unblocked>;

    type LockedAndSync<'lt> = futures_lite::io::BlockOn<Self::Locked<'static>>;

    #[inline]
    fn async_handle() -> Self::Handle
    where
        Self::Handle: Sized,
    {
        stderr()
    }
}

impl StdioExt for std::io::Stdin {
    type Handle = MutexUnblockHandle<Self::Unblocked>;

    type Locked<'lt> = MutexUnblockGuard<'lt, Self::Unblocked>;

    type LockedAndSync<'lt> = futures_lite::io::BlockOn<Self::Locked<'static>>;

    #[inline]
    fn async_handle() -> Self::Handle
    where
        Self::Handle: Sized,
    {
        stdin()
    }
}

pub(crate) mod cleanup {
    use futures_lite::AsyncWriteExt as _;
    use std::{io, sync};

    use crate::{STDERR, STDOUT};
    /// If [STDOUT] is initialised, this flushes the stream as a cleanup mechanism. This is important
    /// because [Unblock] has an internal buffer separate from the inner stdout (which has a similar
    /// cleanup mechanism).
    ///
    /// This function ensures flushing occurs no more than once, even if called multiple times.
    fn cleanup_flush_stdout() {
        static FLUSH_ONCE: sync::Once = sync::Once::new();
        // It's important  we use .get() here rather than simple deref, because if we did that, and
        // the program exited during initialization, it could cause infinite loops, or deadlock, or
        // something like that (as then it would try to initialize again).
        if let Some(init_stdout) = STDOUT.get() {
            FLUSH_ONCE.call_once(|| {
                let mut init_stdout = init_stdout.lock_blocking();

                let _result = nolocal_block_on::block_on(async {
                    loop {
                        match init_stdout.flush().await {
                            Ok(()) => break Ok(()),
                            Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
                            Err(e) => break Err(e),
                        }
                    }
                });
            })
        }
    }

    /// If [STDERR] is initialised, this flushes the stream as a cleanup mechanism. This is important
    /// because [Unblock] has an internal buffer separate from the inner stderr. The inner stderr
    /// itself is not buffered, though.
    ///
    /// This function ensures flushing occurs no more than once, even if called multiple times.
    fn cleanup_flush_stderr() {
        static FLUSH_ONCE: sync::Once = sync::Once::new();
        // It's important  we use .get() here rather than simple deref, because if we did that, and
        // the program exited during initialization, it could cause infinite loops, or deadlock, or
        // something like that (as then it would try to initialize again).
        if let Some(init_stderr) = STDERR.get() {
            FLUSH_ONCE.call_once(|| {
                let mut init_stderr = init_stderr.lock_blocking();

                let _result = nolocal_block_on::block_on(async {
                    loop {
                        match init_stderr.flush().await {
                            Ok(()) => break Ok(()),
                            Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
                            Err(e) => break Err(e),
                        }
                    }
                });
            })
        }
    }

    /// This invokes the [cleanup_flush_stdout] and [cleanup_flush_stderr] hooks in the correct order
    /// (stderr is flushed first as it's likely to be most important).
    pub(super) extern "C" fn cleanup() {
        cleanup_flush_stderr();
        cleanup_flush_stdout();
    }
}

/// [`std::sync::LazyLock`] that registers the cleanup callbacks for [STDOUT] and [STDERR].
///
/// The actual value is a boolean indicating if it succeeded in registration or not.
static ASYNC_STDIO_CLEANUP_REGISTERED: LazyLock<bool> = LazyLock::new(|| {
    // SAFETY - cleanup functions include internal synchronisation to ensure cleanup does not occur
    // more than once, even if libc ends up doing weird things.
    unsafe { libc::atexit(cleanup::cleanup) == 0 }
});

static STDOUT: OnceLock<MutexUnblock<Unblock<io::Stdout>>> = OnceLock::new();

/// Get the synchronised [`blocking::Unblock`]-ed stdout. A best-effort attempt is made to flush
/// the asynchronous standard output upon program exit (including things stored in the pipe it uses
/// to do async things).
///
/// The returned handle cannot be locked in a re-entrant fashion while the lock is held, within a
/// future/task that is being waited for. If you try, it will cause a deadlock.
#[must_use]
pub fn stdout() -> StdoutUnblock {
    MutexUnblockHandle(STDOUT.get_or_init(|| {
        let r = MutexUnblock(SeqMutex::new(Unblock::new(io::stdout())));
        if !(*ASYNC_STDIO_CLEANUP_REGISTERED) {
            panic!("could not register async stdio cleanup functions")
        }
        r
    }))
}

static STDERR: OnceLock<MutexUnblock<Unblock<io::Stderr>>> = OnceLock::new();

/// Get the synchronised [`blocking::Unblock`]-ed stderr. A best-effort attempt is made to flush
/// the asynchronous standard error upon program exit (including things stored in the pipe it uses
/// to do async things).
///
/// The returned handle cannot be locked in a re-entrant fashion while the lock is held, within a
/// future/task that is being waited for. If you try, it will cause a deadlock.
#[must_use]
pub fn stderr() -> StderrUnblock {
    MutexUnblockHandle(STDERR.get_or_init(|| {
        let r = MutexUnblock(SeqMutex::new(Unblock::new(io::stderr())));
        if !(*ASYNC_STDIO_CLEANUP_REGISTERED) {
            panic!("could not register async stdio cleanup functions");
        }
        r
    }))
}

static STDIN: OnceLock<MutexUnblock<BufReader<Unblock<io::Stdin>>>> = OnceLock::new();

/// Get the synchronised [`blocking::Unblock`]-ed stdin
///
/// The returned handle cannot be locked in a re-entrant fashion while the lock is held, within a
/// future/task that is being waited for. If you try, it will cause a deadlock.
#[must_use]
pub fn stdin() -> StdinUnblock {
    MutexUnblockHandle(
        STDIN
            .get_or_init(|| MutexUnblock(SeqMutex::new(BufReader::new(Unblock::new(io::stdin()))))),
    )
}

impl<T: ?Sized> MutexUnblockGuard<'_, T> {
    /// Get the pinned form of the stored, synchronised value from this guard structure.
    ///
    /// This is not public, such as to avoid leaking `T`
    #[inline]
    pub(crate) fn inner_pinned<'s>(self: Pin<&'s mut Self>) -> Pin<&mut T>
    where
        T: Unpin,
    {
        Pin::new(self.project().inner.get_mut())
    }
}

// MANUAL LOCK IMPLS
impl<T: AsyncRead + Unpin + ?Sized> AsyncRead for MutexUnblockGuard<'_, T> {
    #[inline]
    fn poll_read(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut [u8],
    ) -> std::task::Poll<io::Result<usize>> {
        self.inner_pinned().poll_read(cx, buf)
    }

    #[inline]
    fn poll_read_vectored(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        bufs: &mut [io::IoSliceMut<'_>],
    ) -> std::task::Poll<io::Result<usize>> {
        self.inner_pinned().poll_read_vectored(cx, bufs)
    }
}

impl<T: AsyncBufRead + Unpin + ?Sized> AsyncBufRead for MutexUnblockGuard<'_, T> {
    #[inline]
    fn poll_fill_buf(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<io::Result<&[u8]>> {
        self.inner_pinned().poll_fill_buf(cx)
    }

    #[inline]
    fn consume(self: Pin<&mut Self>, amt: usize) {
        self.inner_pinned().consume(amt)
    }
}

impl<T: AsyncSeek + Unpin + ?Sized> AsyncSeek for MutexUnblockGuard<'_, T> {
    #[inline]
    fn poll_seek(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        pos: io::SeekFrom,
    ) -> std::task::Poll<io::Result<u64>> {
        self.inner_pinned().poll_seek(cx, pos)
    }
}

impl<T: AsyncWrite + Unpin + ?Sized> AsyncWrite for MutexUnblockGuard<'_, T> {
    #[inline]
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &[u8],
    ) -> std::task::Poll<io::Result<usize>> {
        self.inner_pinned().poll_write(cx, buf)
    }

    #[inline]
    fn poll_flush(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<io::Result<()>> {
        self.inner_pinned().poll_flush(cx)
    }

    #[inline]
    fn poll_close(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<io::Result<()>> {
        self.inner_pinned().poll_close(cx)
    }

    #[inline]
    fn poll_write_vectored(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        bufs: &[io::IoSlice<'_>],
    ) -> std::task::Poll<io::Result<usize>> {
        self.inner_pinned().poll_write_vectored(cx, bufs)
    }
}

impl<T: Stream + Unpin + ?Sized> Stream for MutexUnblockGuard<'_, T> {
    type Item = T::Item;

    #[inline]
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        self.inner_pinned().poll_next(cx)
    }

    #[inline]
    fn size_hint(&self) -> (usize, Option<usize>) {
        self.inner.size_hint()
    }
}

// async-blocking-stdio - std::io::std{in(), out(), err()}, but async
// Copyright (C) 2024  Matti Bryce <mattibryce at protonmail dot com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.