Skip to main content

bytesbuf_io/testing/
fake_read.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use std::any::type_name;
5use std::convert::Infallible;
6use std::num::NonZero;
7
8use bytesbuf::mem::testing::TransparentMemory;
9use bytesbuf::mem::{HasMemory, Memory, MemoryShared, OpaqueMemory};
10use bytesbuf::{BytesBuf, BytesView};
11
12use crate::Read;
13
14// Arbitrary number to fulfill the API contract that the stream decides its own optimal read size.
15// As this is a fake stream, we could also move this value to the builder as a configurable option,
16// but for now, we keep it simple because no scenario where that was relevant has appeared yet.
17const PREFERRED_READ_SIZE: usize = 100;
18
19/// A [`Read`] that reads from a [`BytesView`].
20///
21/// This is for test and example purposes only and is not optimized for performance.
22#[derive(Debug)]
23pub struct FakeRead {
24    contents: BytesView,
25
26    // For testing purposes, we may choose to limit the read size and
27    // thereby force the caller to do multiple read operations.
28    max_read_size: Option<NonZero<usize>>,
29
30    memory: OpaqueMemory,
31}
32
33impl FakeRead {
34    /// Starts building a new `FakeRead`.
35    #[must_use]
36    pub fn builder() -> FakeReadBuilder {
37        FakeReadBuilder {
38            contents: None,
39            max_read_size: None,
40            memory: OpaqueMemory::new(TransparentMemory::new()),
41        }
42    }
43
44    /// Creates a new `FakeRead` with the given contents and the default configuration.
45    #[must_use]
46    pub fn new(contents: BytesView) -> Self {
47        Self::builder().contents(contents).build()
48    }
49
50    /// Reads at most `len` bytes into the provided buffer.
51    ///
52    /// It is not necessary for `into` to be empty - the buffer may already have some
53    /// bytes of data in it (e.g. from a previous read).
54    ///
55    /// The buffer will be extended with additional memory capacity
56    /// if it does not have enough remaining capacity to fit `len` additional bytes.
57    ///
58    /// Returns a tuple of the number of bytes read and the updated buffer.
59    ///
60    /// The returned [`BytesBuf`] will have 0 or more bytes of data appended to it on success,
61    /// with 0 appended bytes indicating that no more bytes can be read from this source. Any
62    /// data that was already in the buffer will remain untouched.
63    ///
64    /// # Errors
65    ///
66    /// This call never fails.
67    #[cfg_attr(test, mutants::skip)] // Mutations easily lead to infinite loops, not worth the effort.
68    #[expect(clippy::unused_async, reason = "API compatibility between trait and inherent fn")]
69    pub async fn read_at_most_into(&mut self, len: usize, mut into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
70        let bytes_to_read = len
71            .min(self.contents.len())
72            .min(self.max_read_size.map_or(usize::MAX, NonZero::get));
73
74        if bytes_to_read == 0 {
75            return Ok((0, into));
76        }
77
78        let data_to_read = self.contents.range(0..bytes_to_read);
79        into.put_bytes(data_to_read);
80
81        self.contents.advance(bytes_to_read);
82
83        Ok((bytes_to_read, into))
84    }
85
86    // Reads an unspecified number of bytes into the provided buffer.
87    ///
88    /// The implementation will decide how many bytes to read based on its internal understanding of
89    /// what is optimal for sustained throughput at high efficiency. This may be a fixed size,
90    /// or it may be a variable size based on the current state of the source.
91    ///
92    /// It is not necessary for `into` to be empty - the buffer may already have some
93    /// bytes of data in it (e.g. from a previous read).
94    ///
95    /// The buffer will be extended with additional memory capacity
96    /// if it does not have enough remaining capacity to fit `len` additional bytes.
97    ///
98    /// Returns a tuple of the number of bytes read and the updated buffer.
99    ///
100    /// The returned [`BytesBuf`] will have 0 or more bytes of data appended to it on success,
101    /// with 0 appended bytes indicating that no more bytes can be read from this source. Any
102    /// data that was already in the buffer will remain untouched.
103    ///
104    /// # Errors
105    ///
106    /// This call never fails.
107    #[cfg_attr(test, mutants::skip)] // Mutations easily lead to infinite loops, not worth the effort.
108    pub async fn read_more_into(&mut self, into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
109        let previous_len = into.len();
110
111        self.read_at_most_into(PREFERRED_READ_SIZE, into)
112            .await
113            .inspect(|result| debug_assert_eq!(previous_len + result.0, result.1.len()))
114    }
115
116    /// Reads an unspecified number of bytes as a new buffer.
117    ///
118    /// The implementation will decide how many bytes to read based on its internal understanding of
119    /// what is optimal for sustained throughput at high efficiency. This may be a fixed size,
120    /// or it may be a variable size based on the current state of the source.
121    ///
122    /// The returned [`BytesBuf`] will have 0 or more bytes of data appended to it on success,
123    /// with 0 appended bytes indicating that no more bytes can be read from this source.
124    ///
125    /// # Security
126    ///
127    /// **This method is insecure if the side producing the bytes is not trusted**. An attacker
128    /// may trickle data byte-by-byte, consuming a large amount of I/O resources.
129    ///
130    /// Robust code working with untrusted sources should take precautions such as only processing
131    /// read data when either a time or length threshold is reached and reusing buffers that
132    /// have remaining capacity, appending additional data to existing buffers using
133    /// [`read_more_into()`][crate::Read::read_more_into] instead of reserving new buffers
134    /// for each read operation.
135    ///
136    /// # Errors
137    ///
138    /// This call never fails.
139    pub async fn read_any(&mut self) -> Result<BytesBuf, Infallible> {
140        Ok(self.read_at_most_into(PREFERRED_READ_SIZE, BytesBuf::new()).await?.1)
141    }
142
143    /// Returns the memory provider that was configured in the builder.
144    #[must_use]
145    pub fn memory(&self) -> impl MemoryShared {
146        self.memory.clone()
147    }
148
149    /// Reserves at least `min_bytes` bytes of memory capacity.
150    ///
151    /// Returns an empty [`BytesBuf`] that can be used to fill the reserved memory with data.
152    ///
153    /// The memory provider may provide more memory than requested.
154    ///
155    /// The memory reservation request will always be fulfilled, obtaining more memory from the
156    /// operating system if necessary.
157    ///
158    /// # Zero-sized reservations
159    ///
160    /// Reserving zero bytes of memory is a valid operation and will return a [`BytesBuf`]
161    /// with zero or more bytes of capacity.
162    ///
163    /// # Panics
164    ///
165    /// May panic if the operating system runs out of memory.
166    #[must_use]
167    pub fn reserve(&self, min_bytes: usize) -> BytesBuf {
168        self.memory.reserve(min_bytes)
169    }
170}
171
172#[cfg_attr(coverage_nightly, coverage(off))] // Trivial forwarder.
173impl Read for FakeRead {
174    type Error = Infallible;
175
176    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
177    async fn read_at_most_into(&mut self, len: usize, into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
178        self.read_at_most_into(len, into).await
179    }
180
181    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
182    async fn read_more_into(&mut self, into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
183        self.read_more_into(into).await
184    }
185
186    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
187    async fn read_any(&mut self) -> Result<BytesBuf, Infallible> {
188        self.read_any().await
189    }
190}
191
192#[cfg_attr(coverage_nightly, coverage(off))] // Trivial forwarder.
193impl Memory for FakeRead {
194    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
195    fn reserve(&self, min_bytes: usize) -> BytesBuf {
196        self.reserve(min_bytes)
197    }
198}
199
200#[cfg_attr(coverage_nightly, coverage(off))] // Trivial forwarder.
201impl HasMemory for FakeRead {
202    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
203    fn memory(&self) -> impl MemoryShared {
204        self.memory()
205    }
206}
207
208/// Creates an instance of [`FakeRead`].
209///
210/// Access through [`FakeRead::builder()`][FakeRead::builder].
211#[derive(Debug)]
212pub struct FakeReadBuilder {
213    contents: Option<BytesView>,
214    max_read_size: Option<NonZero<usize>>,
215    memory: OpaqueMemory,
216}
217
218impl FakeReadBuilder {
219    /// The data to return from read operations. Mandatory.
220    #[must_use]
221    pub fn contents(mut self, contents: BytesView) -> Self {
222        self.contents = Some(contents);
223        self
224    }
225
226    /// Restricts the result of a single read operation to at most `max_read_size` bytes.
227    ///
228    /// Optional. Defaults to no limit.
229    #[must_use]
230    pub fn max_read_size(mut self, max_read_size: NonZero<usize>) -> Self {
231        self.max_read_size = Some(max_read_size);
232        self
233    }
234
235    /// The memory provider to use in memory-related stream operations.
236    ///
237    /// Optional. Defaults to using the Rust global allocator.
238    #[must_use]
239    pub fn memory(mut self, memory: OpaqueMemory) -> Self {
240        self.memory = memory;
241        self
242    }
243
244    /// Builds the `FakeRead` with the provided configuration.
245    ///
246    /// # Panics
247    ///
248    /// Panics if the contents of the stream have not been set.
249    #[must_use]
250    pub fn build(self) -> FakeRead {
251        assert!(self.contents.is_some(), "{} requires a sequence to be set", type_name::<Self>());
252
253        FakeRead {
254            contents: self.contents.expect("guarded by assertion above"),
255            max_read_size: self.max_read_size,
256            memory: self.memory,
257        }
258    }
259}
260
261#[cfg(test)]
262#[cfg_attr(coverage_nightly, coverage(off))]
263mod tests {
264    use bytesbuf::mem::GlobalPool;
265    use new_zealand::nz;
266    use testing_aids::async_test;
267
268    use super::*;
269    use crate::ReadExt;
270
271    #[test]
272    fn smoke_test() {
273        async_test(async || {
274            let memory = GlobalPool::new();
275            let mut buf = memory.reserve(100);
276            buf.put_byte(1);
277            buf.put_byte(2);
278            buf.put_byte(3);
279
280            let contents = buf.consume_all();
281
282            let mut read_stream = FakeRead::new(contents);
283
284            let stream_memory = read_stream.reserve(1234);
285            assert!(stream_memory.capacity() >= 1234);
286
287            let mut payload = read_stream.read_exactly(3).await.unwrap();
288            assert_eq!(payload.len(), 3);
289
290            assert_eq!(payload.get_byte(), 1);
291            assert_eq!(payload.get_byte(), 2);
292            assert_eq!(payload.get_byte(), 3);
293
294            let final_read = read_stream.read_any().await.unwrap();
295            assert_eq!(final_read.len(), 0);
296        });
297    }
298
299    #[test]
300    fn with_max_read_size() {
301        async_test(async || {
302            let memory = GlobalPool::new();
303            let test_data = BytesView::copied_from_slice(b"Hello, world!", &memory);
304            let mut read_stream = FakeRead::builder().contents(test_data).max_read_size(nz!(2)).build();
305
306            // He
307            let mut payload = read_stream.read_any().await.unwrap().consume_all();
308            assert_eq!(payload.len(), 2);
309            assert_eq!(payload.get_byte(), b'H');
310            assert_eq!(payload.get_byte(), b'e');
311
312            // ll
313            payload = read_stream.read_any().await.unwrap().consume_all();
314            assert_eq!(payload.len(), 2);
315            assert_eq!(payload.get_byte(), b'l');
316            assert_eq!(payload.get_byte(), b'l');
317
318            // o,
319            payload = read_stream.read_any().await.unwrap().consume_all();
320            assert_eq!(payload.len(), 2);
321            assert_eq!(payload.get_byte(), b'o');
322            assert_eq!(payload.get_byte(), b',');
323
324            //  w
325            payload = read_stream.read_any().await.unwrap().consume_all();
326            assert_eq!(payload.len(), 2);
327            assert_eq!(payload.get_byte(), b' ');
328            assert_eq!(payload.get_byte(), b'w');
329
330            // or
331            payload = read_stream.read_any().await.unwrap().consume_all();
332            assert_eq!(payload.len(), 2);
333            assert_eq!(payload.get_byte(), b'o');
334            assert_eq!(payload.get_byte(), b'r');
335
336            // ld
337            payload = read_stream.read_any().await.unwrap().consume_all();
338            assert_eq!(payload.len(), 2);
339            assert_eq!(payload.get_byte(), b'l');
340            assert_eq!(payload.get_byte(), b'd');
341
342            // !
343            payload = read_stream.read_any().await.unwrap().consume_all();
344            assert_eq!(payload.len(), 1);
345            assert_eq!(payload.get_byte(), b'!');
346        });
347    }
348
349    #[test]
350    fn read_more() {
351        async_test(async || {
352            let memory = GlobalPool::new();
353            let test_data = BytesView::copied_from_slice(b"Hello, world!", &memory);
354            let mut read_stream = FakeRead::builder().contents(test_data.clone()).max_read_size(nz!(2)).build();
355
356            let mut payload_buffer = read_stream.reserve(100);
357
358            loop {
359                let previous_len = payload_buffer.len();
360
361                let (bytes_read, new_payload_buffer) = read_stream.read_more_into(payload_buffer).await.unwrap();
362
363                payload_buffer = new_payload_buffer;
364
365                // Sanity check.
366                assert_eq!(previous_len + bytes_read, payload_buffer.len());
367
368                if bytes_read == 0 {
369                    break;
370                }
371            }
372
373            assert_eq!(payload_buffer.len(), test_data.len());
374
375            let mut payload = payload_buffer.consume_all();
376            assert_eq!(payload.len(), test_data.len());
377            assert_eq!(payload.get_byte(), b'H');
378            assert_eq!(payload.get_byte(), b'e');
379            assert_eq!(payload.get_byte(), b'l');
380            assert_eq!(payload.get_byte(), b'l');
381            assert_eq!(payload.get_byte(), b'o');
382            assert_eq!(payload.get_byte(), b',');
383            assert_eq!(payload.get_byte(), b' ');
384            assert_eq!(payload.get_byte(), b'w');
385            assert_eq!(payload.get_byte(), b'o');
386            assert_eq!(payload.get_byte(), b'r');
387            assert_eq!(payload.get_byte(), b'l');
388            assert_eq!(payload.get_byte(), b'd');
389            assert_eq!(payload.get_byte(), b'!');
390            assert_eq!(payload.len(), 0);
391        });
392    }
393
394    #[test]
395    fn memory_returns_configured_provider() {
396        use std::sync::Arc;
397        use std::sync::atomic::{AtomicBool, Ordering};
398
399        use bytesbuf::mem::testing::TransparentMemory;
400        use bytesbuf::mem::{CallbackMemory, OpaqueMemory};
401
402        let callback_called = Arc::new(AtomicBool::new(false));
403
404        let custom_memory = OpaqueMemory::new(CallbackMemory::new({
405            let callback_called = Arc::clone(&callback_called);
406            move |min_bytes| {
407                callback_called.store(true, Ordering::SeqCst);
408                TransparentMemory::new().reserve(min_bytes)
409            }
410        }));
411
412        let memory = GlobalPool::new();
413        let contents = BytesView::copied_from_slice(b"test", &memory);
414        let read_stream = FakeRead::builder().contents(contents).memory(custom_memory).build();
415
416        // Get memory from stream and use it
417        let stream_memory = read_stream.memory();
418        let _buf = stream_memory.reserve(10);
419
420        assert!(
421            callback_called.load(Ordering::SeqCst),
422            "Custom memory callback should have been called"
423        );
424    }
425}