json_streaming/nonblocking/
io.rs

1use async_trait::async_trait;
2use core::error::Error;
3
4/// [NonBlockingWrite] is the library's abstraction for non-blocking write I/O.
5///
6/// It is similar to `tokio::io::AsyncWrite`, and there is a blanket implementation of
7///  [NonBlockingWrite] for any implementation of `AsyncWrite`. The reason for introducing
8///  [NonBlockingWrite] is that it decouples json-streaming from tokio and allows it to be used
9///  with other async frameworks.
10///
11/// Note that json-streaming writes data to a [NonBlockingWrite] in many small chunks without any
12///  I/O buffering. It is the client's responsibility to add buffering for improved performance
13///  where desired.
14#[async_trait]
15pub trait NonBlockingWrite {
16    type Error: Error;
17
18    async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error>;
19}
20
21#[cfg(all(test, not(feature="tokio")))]
22#[async_trait]
23impl NonBlockingWrite for Vec<u8> {
24    type Error = std::io::Error;
25
26    async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> {
27        self.extend_from_slice(buf);
28        Ok(())
29    }
30}
31
32#[cfg(feature = "tokio")]
33/// Blanket implementation that allows any [tokio::io::AsyncWrite] implementation to be used
34///  seamlessly as [NonBlockingWrite].
35#[async_trait]
36impl <W: tokio::io::AsyncWrite + Unpin + Send> NonBlockingWrite for W {
37    type Error = std::io::Error;
38
39    async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> {
40        tokio::io::AsyncWriteExt::write_all(self, buf).await
41    }
42}
43
44
45#[async_trait]
46/// [NonBlockingRead] is the library's abstraction for non-blocking read I/O.
47///
48/// It is similar to `tokio:io::AsyncRead`, and there is a blanket implementation of
49///  [NonBlockingRead] for any implementation of `AsyncRead`. The reason for introducing
50///  [NonBlockingRead] is to decouple json-streaming from tokio and allow it to be used with
51///  other async frameworks.
52///
53/// Note that json-streaming reads data from a [NonBlockingRead] in many small chunks without any
54///  I/O buffering. It is the client's responsibility to add buffering for improved performance
55///  where desired.
56pub trait NonBlockingRead {
57    type Error: Error;
58
59    async fn read(&mut self) -> Result<Option<u8>, Self::Error>;
60}
61
62#[cfg(all(test, not(feature = "tokio")))]
63#[async_trait]
64impl NonBlockingRead for std::io::Cursor<Vec<u8>> {
65    type Error = std::io::Error;
66
67    async fn read(&mut self) -> Result<Option<u8>, Self::Error> {
68        let mut result = [0u8; 1];
69        let num_read = std::io::Read::read(self, &mut result)?;
70        if num_read == 1 {
71            Ok(Some(result[0]))
72        }
73        else {
74            Ok(None)
75        }
76    }
77}
78
79#[cfg(feature = "tokio")]
80/// Blanket implementation that allows any [tokio::io::AsyncRead] implementation to be used
81///  seamlessly as [NonBlockingRead].
82#[async_trait]
83impl <R: tokio::io::AsyncRead + Unpin + Send> NonBlockingRead for R {
84    type Error = std::io::Error;
85
86    async fn read(&mut self) -> Result<Option<u8>, Self::Error> {
87        let mut result = [0u8; 1];
88        let num_read = tokio::io::AsyncReadExt::read(self, &mut result).await?;
89        if num_read == 1 {
90            Ok(Some(result[0]))
91        }
92        else {
93            Ok(None)
94        }
95    }
96}