fixed_buffer_tokio/
lib.rs

1//! # NOT MAINTAINED
2//! This crate was replaced by
3//! [`fixed-buffer`](https://crates.io/crates/fixed-buffer) crates's `tokio` feature
4//! and [`read-write-ext-tokio`](https://crates.io/crates/read-write-ext-tokio).
5//!
6//! ----
7//!
8//! [![crates.io version](https://img.shields.io/crates/v/fixed-buffer-tokio.svg)](https://crates.io/crates/fixed-buffer-tokio)
9//! [![license: Apache 2.0](https://gitlab.com/leonhard-llc/fixed-buffer-rs/-/raw/main/license-apache-2.0.svg)](http://www.apache.org/licenses/LICENSE-2.0)
10//! [![unsafe forbidden](https://gitlab.com/leonhard-llc/fixed-buffer-rs/-/raw/main/unsafe-forbidden-success.svg)](https://github.com/rust-secure-code/safety-dance/)
11//! [![pipeline status](https://gitlab.com/leonhard-llc/fixed-buffer-rs/badges/main/pipeline.svg)](https://gitlab.com/leonhard-llc/fixed-buffer-rs/-/pipelines)
12//!
13//! This is a Rust library with fixed-size buffers,
14//! useful for network protocol parsers and file parsers.
15//!
16//! This is the tokio async version of [`fixed-buffer`](https://crates.io/crates/fixed-buffer).
17//!
18//! # Features
19//! - Write bytes to the buffer and read them back
20//! - Use it to read a stream, search for a delimiter,
21//!   and save leftover bytes for the next read.
22//! - Does not allocate memory
23//! - Depends only on
24//!   [`std`](https://doc.rust-lang.org/stable/std/),
25//!   [`tokio`](https://crates.io/crates/tokio), and
26//!   [`fixed-buffer`](https://crates.io/crates/fixed-buffer).
27//! - No macros
28//! - Good test coverage (98%)
29//! - `forbid(unsafe_code)`
30//!
31//! # Documentation
32//! https://docs.rs/fixed-buffer-tokio
33//!
34//! # Examples
35//! For a complete example, see
36//! [`tests/server.rs`](https://gitlab.com/leonhard-llc/fixed-buffer-rs/-/blob/main/fixed-buffer-tokio/tests/server.rs).
37//!
38//! # Alternatives
39//! - [tokio::io::BufReader](https://docs.rs/tokio/latest/tokio/io/struct.BufReader.html)
40//! - [tokio::io::BufWriter](https://docs.rs/tokio/latest/tokio/io/struct.BufWriter.html)
41//!
42//! # Changelog
43//! - v0.3.4 - Update "not maintained" message.
44//! - v0.3.3 - No longer maintained.
45//! - v0.3.2 - Update docs
46//! - v0.3.1 - Support Tokio 1
47//! - v0.3.0 - Breaking API changes:
48//!   - Change type parameter to const buffer size. Example: `FixedBuf<1024>`.
49//!   - Remove `new` arg.
50//!   - Remove `capacity`.
51//!   - Change `writable` return type to `&mut [u8]`.
52//! - v0.1.1 - Add badges to readme
53//! - v0.1.0 - First published version
54//!
55//! # Release Process
56//! 1. Edit `Cargo.toml` and bump version number.
57//! 1. Run `../release.sh`
58#![forbid(unsafe_code)]
59
60use core::pin::Pin;
61use core::task::{Context, Poll};
62use fixed_buffer::{FixedBuf, MalformedInputError};
63
64mod async_read_write_chain;
65pub use async_read_write_chain::*;
66
67mod async_read_write_take;
68pub use async_read_write_take::*;
69
70#[cfg(test)]
71mod test_utils;
72#[cfg(test)]
73pub use test_utils::*;
74
75/// A newtype that wraps
76/// [`FixedBuf`](https://docs.rs/fixed-buffer/latest/fixed_buffer/struct.FixedBuf.html)
77/// and implements
78/// [`tokio::io::AsyncRead`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncRead.html)
79/// and
80/// [`tokio::io::AsyncWrite`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncWrite.html).
81///
82/// It also has async versions of FixedBuf's io functions.
83pub struct AsyncFixedBuf<const SIZE: usize>(FixedBuf<SIZE>);
84
85impl<const SIZE: usize> AsyncFixedBuf<SIZE> {
86    /// Creates a new FixedBuf and wraps it in an AsyncFixedBuf.
87    ///
88    /// See
89    /// [`FixedBuf::new`](https://docs.rs/fixed-buffer/latest/fixed_buffer/struct.FixedBuf.html#method.new)
90    /// for details.
91    pub const fn new() -> Self {
92        AsyncFixedBuf(FixedBuf::new())
93    }
94
95    /// Drops the struct and returns its internal
96    /// [`FixedBuf`](https://docs.rs/fixed-buffer/latest/fixed_buffer/struct.FixedBuf.html).
97    pub fn into_inner(self) -> FixedBuf<SIZE> {
98        self.0
99    }
100
101    /// Makes a new empty buffer.
102    ///
103    /// Consumes `mem` and uses it as the internal memory array.
104    /// ```
105    pub fn empty(mem: [u8; SIZE]) -> Self {
106        Self(FixedBuf::empty(mem))
107    }
108
109    /// Makes a new full buffer containing the bytes in `mem`.
110    /// Reading the buffer will return the bytes in `mem`.
111    ///
112    /// Consumes `mem` and uses it as the internal memory array.
113    /// ```
114    pub fn filled(mem: [u8; SIZE]) -> Self {
115        Self(FixedBuf::filled(mem))
116    }
117
118    /// Reads from `reader` once and writes the data into the buffer.
119    ///
120    /// Returns [`InvalidData`](std::io::ErrorKind::InvalidData)
121    /// if there is no empty space in the buffer.
122    /// See [`shift`](#method.shift).
123    pub async fn copy_once_from<R: tokio::io::AsyncRead + std::marker::Unpin + Send>(
124        &mut self,
125        reader: &mut R,
126    ) -> Result<usize, std::io::Error> {
127        let writable = self.writable();
128        if writable.is_empty() {
129            return Err(std::io::Error::new(
130                std::io::ErrorKind::InvalidData,
131                "no empty space in buffer",
132            ));
133        };
134        let num_read = tokio::io::AsyncReadExt::read(reader, writable).await?;
135        self.wrote(num_read);
136        Ok(num_read)
137    }
138
139    /// Async version of
140    /// [`FixedBuf::read_frame`](https://docs.rs/fixed-buffer/latest/fixed_buffer/struct.FixedBuf.html#method.read_frame).
141    pub async fn read_frame<R, F>(
142        &mut self,
143        reader: &mut R,
144        deframer_fn: F,
145    ) -> Result<Option<&[u8]>, std::io::Error>
146    where
147        R: tokio::io::AsyncRead + std::marker::Unpin + Send,
148        F: Fn(&[u8]) -> Result<Option<(core::ops::Range<usize>, usize)>, MalformedInputError>,
149    {
150        loop {
151            if !self.is_empty() {
152                if let Some(frame_range) = self.deframe(&deframer_fn)? {
153                    return Ok(Some(&self.mem()[frame_range]));
154                }
155                // None case falls through.
156            }
157            self.shift();
158            let writable = self.writable();
159            if writable.is_empty() {
160                return Err(std::io::Error::new(
161                    std::io::ErrorKind::InvalidData,
162                    "end of buffer full",
163                ));
164            };
165            let num_read = tokio::io::AsyncReadExt::read(reader, writable).await?;
166            if num_read == 0 {
167                if self.is_empty() {
168                    return Ok(None);
169                }
170                return Err(std::io::Error::new(
171                    std::io::ErrorKind::UnexpectedEof,
172                    "eof after reading part of a frame",
173                ));
174            }
175            self.wrote(num_read);
176        }
177    }
178}
179
180impl<const SIZE: usize> Unpin for AsyncFixedBuf<SIZE> {}
181
182impl<const SIZE: usize> std::ops::Deref for AsyncFixedBuf<SIZE> {
183    type Target = FixedBuf<SIZE>;
184    fn deref(&self) -> &Self::Target {
185        &self.0
186    }
187}
188
189impl<const SIZE: usize> std::ops::DerefMut for AsyncFixedBuf<SIZE> {
190    fn deref_mut(&mut self) -> &mut Self::Target {
191        &mut self.0
192    }
193}
194
195impl<const SIZE: usize> tokio::io::AsyncRead for AsyncFixedBuf<SIZE> {
196    fn poll_read(
197        self: Pin<&mut Self>,
198        _cx: &mut Context<'_>,
199        buf: &mut tokio::io::ReadBuf<'_>,
200    ) -> Poll<Result<(), std::io::Error>> {
201        let num_read = self
202            .get_mut()
203            .0
204            .read_and_copy_bytes(buf.initialize_unfilled());
205        buf.advance(num_read);
206        Poll::Ready(Ok(()))
207    }
208}
209
210impl<const SIZE: usize> tokio::io::AsyncWrite for AsyncFixedBuf<SIZE> {
211    fn poll_write(
212        self: Pin<&mut Self>,
213        _cx: &mut Context<'_>,
214        buf: &[u8],
215    ) -> Poll<Result<usize, std::io::Error>> {
216        Poll::Ready(self.get_mut().0.write_bytes(buf).map_err(|_| {
217            std::io::Error::new(std::io::ErrorKind::InvalidData, "no space in buffer")
218        }))
219    }
220
221    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
222        Poll::Ready(Ok(()))
223    }
224
225    fn poll_shutdown(
226        self: Pin<&mut Self>,
227        _cx: &mut Context<'_>,
228    ) -> Poll<Result<(), std::io::Error>> {
229        Poll::Ready(Ok(()))
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use fixed_buffer::*;
237
238    fn deframe_line_reject_xs(
239        data: &[u8],
240    ) -> Result<Option<(core::ops::Range<usize>, usize)>, MalformedInputError> {
241        if data.contains(&b'x') || data.contains(&b'X') {
242            return Err(MalformedInputError::new(String::from("err1")));
243        }
244        deframe_line(data)
245    }
246
247    #[tokio::test]
248    async fn test_read_frame_empty_to_eof() {
249        let mut buf: AsyncFixedBuf<8> = AsyncFixedBuf::new();
250        let mut reader = std::io::Cursor::new(b"");
251        assert_eq!(
252            None,
253            buf.read_frame(&mut reader, deframe_line_reject_xs)
254                .await
255                .unwrap()
256        );
257        assert_eq!("", escape_ascii(buf.readable()));
258    }
259
260    #[tokio::test]
261    async fn test_read_frame_empty_to_incomplete() {
262        let mut buf: AsyncFixedBuf<8> = AsyncFixedBuf::new();
263        let mut reader = std::io::Cursor::new(b"abc");
264        assert_eq!(
265            std::io::ErrorKind::UnexpectedEof,
266            buf.read_frame(&mut reader, deframe_line_reject_xs)
267                .await
268                .unwrap_err()
269                .kind()
270        );
271        assert_eq!("abc", escape_ascii(buf.readable()));
272    }
273
274    #[tokio::test]
275    async fn test_read_frame_empty_to_complete() {
276        let mut buf: AsyncFixedBuf<8> = AsyncFixedBuf::new();
277        let mut reader = std::io::Cursor::new(b"abc\n");
278        assert_eq!(
279            "abc",
280            escape_ascii(
281                buf.read_frame(&mut reader, deframe_line_reject_xs)
282                    .await
283                    .unwrap()
284                    .unwrap()
285            )
286        );
287        assert_eq!("", escape_ascii(buf.readable()));
288    }
289
290    #[tokio::test]
291    async fn test_read_frame_empty_to_complete_with_leftover() {
292        let mut buf: AsyncFixedBuf<8> = AsyncFixedBuf::new();
293        let mut reader = std::io::Cursor::new(b"abc\nde");
294        assert_eq!(
295            "abc",
296            escape_ascii(
297                buf.read_frame(&mut reader, deframe_line_reject_xs)
298                    .await
299                    .unwrap()
300                    .unwrap()
301            )
302        );
303        assert_eq!("de", escape_ascii(buf.readable()));
304    }
305
306    #[tokio::test]
307    async fn test_read_frame_empty_to_invalid() {
308        let mut buf: AsyncFixedBuf<8> = AsyncFixedBuf::new();
309        let mut reader = std::io::Cursor::new(b"x");
310        assert_eq!(
311            std::io::ErrorKind::InvalidData,
312            buf.read_frame(&mut reader, deframe_line_reject_xs)
313                .await
314                .unwrap_err()
315                .kind()
316        );
317        assert_eq!("x", escape_ascii(buf.readable()));
318    }
319
320    #[tokio::test]
321    async fn test_read_frame_incomplete_to_eof() {
322        let mut buf: AsyncFixedBuf<8> = AsyncFixedBuf::new();
323        buf.write_str("a").unwrap();
324        let mut reader = std::io::Cursor::new(b"");
325        assert_eq!(
326            std::io::ErrorKind::UnexpectedEof,
327            buf.read_frame(&mut reader, deframe_line_reject_xs)
328                .await
329                .unwrap_err()
330                .kind()
331        );
332        assert_eq!("a", escape_ascii(buf.readable()));
333    }
334
335    #[tokio::test]
336    async fn test_read_frame_incomplete_to_incomplete() {
337        let mut buf: AsyncFixedBuf<8> = AsyncFixedBuf::new();
338        buf.write_str("a").unwrap();
339        let mut reader = std::io::Cursor::new(b"bc");
340        assert_eq!(
341            std::io::ErrorKind::UnexpectedEof,
342            buf.read_frame(&mut reader, deframe_line_reject_xs)
343                .await
344                .unwrap_err()
345                .kind()
346        );
347        assert_eq!("abc", escape_ascii(buf.readable()));
348    }
349
350    #[tokio::test]
351    async fn test_read_frame_incomplete_to_complete() {
352        let mut buf: AsyncFixedBuf<8> = AsyncFixedBuf::new();
353        buf.write_str("a").unwrap();
354        let mut reader = std::io::Cursor::new(b"bc\n");
355        assert_eq!(
356            "abc",
357            escape_ascii(
358                buf.read_frame(&mut reader, deframe_line_reject_xs)
359                    .await
360                    .unwrap()
361                    .unwrap()
362            )
363        );
364        assert_eq!("", escape_ascii(buf.readable()));
365    }
366
367    #[tokio::test]
368    async fn test_read_frame_incomplete_to_complete_with_leftover() {
369        let mut buf: AsyncFixedBuf<8> = AsyncFixedBuf::new();
370        buf.write_str("a").unwrap();
371        let mut reader = std::io::Cursor::new(b"bc\nde");
372        assert_eq!(
373            "abc",
374            escape_ascii(
375                buf.read_frame(&mut reader, deframe_line_reject_xs)
376                    .await
377                    .unwrap()
378                    .unwrap()
379            )
380        );
381        assert_eq!("de", escape_ascii(buf.readable()));
382    }
383
384    #[tokio::test]
385    async fn test_read_frame_complete_doesnt_read() {
386        let mut buf: AsyncFixedBuf<8> = AsyncFixedBuf::new();
387        buf.write_str("abc\n").unwrap();
388        assert_eq!(
389            "abc",
390            escape_ascii(
391                buf.read_frame(&mut FakeAsyncReadWriter::empty(), deframe_line_reject_xs)
392                    .await
393                    .unwrap()
394                    .unwrap()
395            )
396        );
397        assert_eq!("", escape_ascii(buf.readable()));
398    }
399
400    #[tokio::test]
401    async fn test_read_frame_complete_leaves_leftovers() {
402        let mut buf: AsyncFixedBuf<8> = AsyncFixedBuf::new();
403        buf.write_str("abc\nde").unwrap();
404        assert_eq!(
405            "abc",
406            escape_ascii(
407                buf.read_frame(&mut FakeAsyncReadWriter::empty(), deframe_line_reject_xs)
408                    .await
409                    .unwrap()
410                    .unwrap()
411            )
412        );
413        assert_eq!("de", escape_ascii(buf.readable()));
414    }
415
416    #[tokio::test]
417    async fn test_read_frame_invalid_doesnt_read() {
418        let mut buf: AsyncFixedBuf<8> = AsyncFixedBuf::new();
419        buf.write_str("x").unwrap();
420        assert_eq!(
421            std::io::ErrorKind::InvalidData,
422            buf.read_frame(&mut FakeAsyncReadWriter::empty(), deframe_line_reject_xs)
423                .await
424                .unwrap_err()
425                .kind()
426        );
427        assert_eq!("x", escape_ascii(buf.readable()));
428    }
429
430    #[tokio::test]
431    async fn test_read_frame_buffer_full() {
432        let mut buf: AsyncFixedBuf<8> = AsyncFixedBuf::new();
433        buf.write_str("abcdefgh").unwrap();
434        let mut reader = std::io::Cursor::new(b"bc\nde");
435        assert_eq!(
436            std::io::ErrorKind::InvalidData,
437            buf.read_frame(&mut reader, deframe_line_reject_xs)
438                .await
439                .unwrap_err()
440                .kind()
441        );
442        assert_eq!("abcdefgh", escape_ascii(buf.readable()));
443    }
444
445    #[tokio::test]
446    async fn test_async_read() {
447        let mut buf: AsyncFixedBuf<16> = AsyncFixedBuf::new();
448        let mut data = ['.' as u8; 16];
449        assert_eq!(
450            0,
451            tokio::io::AsyncReadExt::read(&mut buf, &mut data)
452                .await
453                .unwrap()
454        );
455        assert_eq!("..........", escape_ascii(&data[..10]));
456        buf.write_str("abc").unwrap();
457        assert_eq!(
458            3,
459            tokio::io::AsyncReadExt::read(&mut buf, &mut data)
460                .await
461                .unwrap()
462        );
463        assert_eq!("abc.......", escape_ascii(&data[..10]));
464        assert_eq!(
465            0,
466            tokio::io::AsyncReadExt::read(&mut buf, &mut data)
467                .await
468                .unwrap()
469        );
470        let many_bs = "b".repeat(16);
471        buf.write_str(&many_bs).unwrap();
472        assert_eq!(
473            16,
474            tokio::io::AsyncReadExt::read(&mut buf, &mut data)
475                .await
476                .unwrap()
477        );
478        assert_eq!(many_bs, escape_ascii(&data[..]));
479        assert_eq!(
480            0,
481            tokio::io::AsyncReadExt::read(&mut buf, &mut data)
482                .await
483                .unwrap()
484        );
485    }
486
487    #[tokio::test]
488    async fn test_async_write() {
489        let mut buf: AsyncFixedBuf<16> = AsyncFixedBuf::new();
490        tokio::io::AsyncWriteExt::write_all(&mut buf, b"abc")
491            .await
492            .unwrap();
493        assert_eq!("abc", escape_ascii(buf.readable()));
494        tokio::io::AsyncWriteExt::write_all(&mut buf, b"def")
495            .await
496            .unwrap();
497        assert_eq!("abcdef", escape_ascii(buf.readable()));
498        buf.read_bytes(1);
499        tokio::io::AsyncWriteExt::write_all(&mut buf, b"g")
500            .await
501            .unwrap();
502        assert_eq!("bcdefg", escape_ascii(buf.readable()));
503        tokio::io::AsyncWriteExt::write_all(&mut buf, "h".repeat(8).as_bytes())
504            .await
505            .unwrap();
506        tokio::io::AsyncWriteExt::write_all(&mut buf, b"i")
507            .await
508            .unwrap();
509        assert_eq!(
510            std::io::ErrorKind::InvalidData,
511            tokio::io::AsyncWriteExt::write_all(&mut buf, b"def")
512                .await
513                .unwrap_err()
514                .kind()
515        );
516    }
517}