async_io_map/
read.rs

1use crate::DEFAULT_BUFFER_SIZE;
2use futures_lite::{io, ready, AsyncBufRead, AsyncRead};
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6/// A trait for mapping data read from an underlying reader.
7pub trait MapReadFn {
8    /// Applies a mapping function to the data read from the underlying reader.
9    /// This function takes a mutable reference to a buffer and modifies it in place.
10    ///
11    /// The length of the buffer is gauranteed to be equal to the capacity of the underlying
12    /// buffer, until the last read operation, where it may be smaller.
13    fn map_read(&mut self, buf: &mut [u8]);
14}
15
16impl<F> MapReadFn for F
17where
18    F: FnMut(&mut [u8]),
19{
20    fn map_read(&mut self, buf: &mut [u8]) {
21        self(buf)
22    }
23}
24
25pin_project_lite::pin_project! {
26  /// A wrapper around an `AsyncRead` that allows for data processing
27  /// before the actual I/O operation.
28  /// 
29  /// This struct buffers the data read from the underlying reader and applies a mapping function
30  /// to the data before returning it. It is designed to optimize reads by using a buffer
31  /// of a specified size (default is 8KB).
32  /// 
33  /// The buffer size also acts as a threshold for the length of data passed to the mapping function,
34  /// and will be gauranteed to be equal the capacity of the underlying buffer, until the last read operation,
35  /// where it may be smaller.
36  pub struct AsyncMapReader<'a, R> {
37      #[pin]
38      inner: R,
39      process_fn: Box<dyn MapReadFn + 'a>,
40      pos: usize, // Current position in the buffer
41      cap: usize, // Current position and capacity of the buffer
42      buf: Box<[u8]>, // Internal buffer for reading data
43  }
44}
45
46impl<'a, R> AsyncMapReader<'a, R>
47where
48    R: AsyncRead,
49{
50    /// Create a new wrapper around an async reader with a processing function
51    pub fn new(reader: R, process_fn: impl MapReadFn + 'a) -> Self {
52        Self {
53            inner: reader,
54            process_fn: Box::new(process_fn),
55            pos: 0,
56            cap: 0,
57            buf: vec![0; DEFAULT_BUFFER_SIZE].into_boxed_slice(), // Start with a reasonable capacity
58        }
59    }
60
61    /// Create a new wrapper with a specific initial buffer capacity
62    pub fn with_capacity(reader: R, process_fn: impl MapReadFn + 'a, capacity: usize) -> Self {
63        Self {
64            inner: reader,
65            process_fn: Box::new(process_fn),
66            pos: 0,
67            cap: 0,
68            buf: vec![0; capacity].into_boxed_slice(),
69        }
70    }
71
72    /// Consume the wrapper and return the inner reader
73    pub fn into_inner(self) -> R {
74        self.inner
75    }
76}
77
78impl<'a, R> AsyncRead for AsyncMapReader<'a, R>
79where
80    R: AsyncRead,
81{
82    fn poll_read(
83        mut self: Pin<&mut Self>,
84        cx: &mut Context<'_>,
85        buf: &mut [u8],
86    ) -> Poll<io::Result<usize>> {
87        if self.pos == self.cap {
88            let fill = ready!(self.as_mut().poll_fill_buf(cx))?;
89            if fill.is_empty() {
90                return Poll::Ready(Ok(0));
91            }
92        }
93        let rem = {
94            let this = self.as_mut().project();
95            &this.buf[*this.pos..*this.cap]
96        };
97        let amt = std::cmp::min(rem.len(), buf.len());
98        buf[..amt].copy_from_slice(&rem[..amt]);
99        self.consume(amt);
100        Poll::Ready(Ok(amt))
101    }
102}
103
104impl<'a, R: AsyncRead> AsyncBufRead for AsyncMapReader<'a, R> {
105    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
106        let mut this = self.project();
107        if *this.pos >= *this.cap {
108            debug_assert!(*this.pos == *this.cap);
109            *this.pos = 0;
110            *this.cap = 0;
111            let read_amount = ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
112            if read_amount == 0 {
113                return Poll::Ready(Ok(&[]));
114            }
115            (this.process_fn).map_read(&mut this.buf[..read_amount]);
116            *this.cap = read_amount;
117        }
118        Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
119    }
120
121    fn consume(self: Pin<&mut Self>, amt: usize) {
122        let this = self.project();
123        *this.pos = std::cmp::min(*this.pos + amt, *this.cap);
124    }
125}
126
127/// A trait for types that can be mapped to an `AsyncMapReader`.
128pub trait AsyncMapRead<'a, R> {
129    /// Maps the underlying reader to an `AsyncMapReader` using the provided mapping function.
130    /// This function uses a default buffer size (8KB) for the internal buffer.
131    fn map(self, f: impl MapReadFn + 'a) -> AsyncMapReader<'a, R>
132    where
133        Self: Sized,
134    {
135        self.map_with_capacity(f, DEFAULT_BUFFER_SIZE)
136    }
137
138    /// Maps the underlying reader to an `AsyncMapReader` using the provided mapping function
139    /// and a specified buffer capacity.
140    ///
141    /// This function allows for more control over the internal buffer size, which can be useful
142    /// for performance tuning.
143    fn map_with_capacity(self, f: impl MapReadFn + 'a, capacity: usize) -> AsyncMapReader<'a, R>;
144}
145
146impl<'a, R: AsyncRead> AsyncMapRead<'a, R> for R {
147    fn map_with_capacity(self, f: impl MapReadFn + 'a, capacity: usize) -> AsyncMapReader<'a, R> {
148        AsyncMapReader::with_capacity(self, f, capacity)
149    }
150}