1use std::any::type_name;
5use std::convert::Infallible;
6use std::num::NonZero;
7
8use bytesbuf::mem::testing::TransparentMemory;
9use bytesbuf::mem::{HasMemory, Memory, MemoryShared, OpaqueMemory};
10use bytesbuf::{BytesBuf, BytesView};
11
12use crate::Read;
13
14const PREFERRED_READ_SIZE: usize = 100;
18
19#[derive(Debug)]
23pub struct FakeRead {
24 contents: BytesView,
25
26 max_read_size: Option<NonZero<usize>>,
29
30 memory: OpaqueMemory,
31}
32
33impl FakeRead {
34 #[must_use]
36 pub fn builder() -> FakeReadBuilder {
37 FakeReadBuilder {
38 contents: None,
39 max_read_size: None,
40 memory: OpaqueMemory::new(TransparentMemory::new()),
41 }
42 }
43
44 #[must_use]
46 pub fn new(contents: BytesView) -> Self {
47 Self::builder().contents(contents).build()
48 }
49
50 #[cfg_attr(test, mutants::skip)] #[expect(clippy::unused_async, reason = "API compatibility between trait and inherent fn")]
69 pub async fn read_at_most_into(&mut self, len: usize, mut into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
70 let bytes_to_read = len
71 .min(self.contents.len())
72 .min(self.max_read_size.map_or(usize::MAX, NonZero::get));
73
74 if bytes_to_read == 0 {
75 return Ok((0, into));
76 }
77
78 let data_to_read = self.contents.range(0..bytes_to_read);
79 into.put_bytes(data_to_read);
80
81 self.contents.advance(bytes_to_read);
82
83 Ok((bytes_to_read, into))
84 }
85
86 #[cfg_attr(test, mutants::skip)] pub async fn read_more_into(&mut self, into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
109 let previous_len = into.len();
110
111 self.read_at_most_into(PREFERRED_READ_SIZE, into)
112 .await
113 .inspect(|result| debug_assert_eq!(previous_len + result.0, result.1.len()))
114 }
115
116 pub async fn read_any(&mut self) -> Result<BytesBuf, Infallible> {
140 Ok(self.read_at_most_into(PREFERRED_READ_SIZE, BytesBuf::new()).await?.1)
141 }
142
143 #[must_use]
145 pub fn memory(&self) -> impl MemoryShared {
146 self.memory.clone()
147 }
148
149 #[must_use]
167 pub fn reserve(&self, min_bytes: usize) -> BytesBuf {
168 self.memory.reserve(min_bytes)
169 }
170}
171
172#[cfg_attr(coverage_nightly, coverage(off))] impl Read for FakeRead {
174 type Error = Infallible;
175
176 #[cfg_attr(test, mutants::skip)] async fn read_at_most_into(&mut self, len: usize, into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
178 self.read_at_most_into(len, into).await
179 }
180
181 #[cfg_attr(test, mutants::skip)] async fn read_more_into(&mut self, into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
183 self.read_more_into(into).await
184 }
185
186 #[cfg_attr(test, mutants::skip)] async fn read_any(&mut self) -> Result<BytesBuf, Infallible> {
188 self.read_any().await
189 }
190}
191
192#[cfg_attr(coverage_nightly, coverage(off))] impl Memory for FakeRead {
194 #[cfg_attr(test, mutants::skip)] fn reserve(&self, min_bytes: usize) -> BytesBuf {
196 self.reserve(min_bytes)
197 }
198}
199
200#[cfg_attr(coverage_nightly, coverage(off))] impl HasMemory for FakeRead {
202 #[cfg_attr(test, mutants::skip)] fn memory(&self) -> impl MemoryShared {
204 self.memory()
205 }
206}
207
208#[derive(Debug)]
212pub struct FakeReadBuilder {
213 contents: Option<BytesView>,
214 max_read_size: Option<NonZero<usize>>,
215 memory: OpaqueMemory,
216}
217
218impl FakeReadBuilder {
219 #[must_use]
221 pub fn contents(mut self, contents: BytesView) -> Self {
222 self.contents = Some(contents);
223 self
224 }
225
226 #[must_use]
230 pub fn max_read_size(mut self, max_read_size: NonZero<usize>) -> Self {
231 self.max_read_size = Some(max_read_size);
232 self
233 }
234
235 #[must_use]
239 pub fn memory(mut self, memory: OpaqueMemory) -> Self {
240 self.memory = memory;
241 self
242 }
243
244 #[must_use]
250 pub fn build(self) -> FakeRead {
251 assert!(self.contents.is_some(), "{} requires a sequence to be set", type_name::<Self>());
252
253 FakeRead {
254 contents: self.contents.expect("guarded by assertion above"),
255 max_read_size: self.max_read_size,
256 memory: self.memory,
257 }
258 }
259}
260
261#[cfg(test)]
262#[cfg_attr(coverage_nightly, coverage(off))]
263mod tests {
264 use bytesbuf::mem::GlobalPool;
265 use new_zealand::nz;
266 use testing_aids::async_test;
267
268 use super::*;
269 use crate::ReadExt;
270
271 #[test]
272 fn smoke_test() {
273 async_test(async || {
274 let memory = GlobalPool::new();
275 let mut buf = memory.reserve(100);
276 buf.put_byte(1);
277 buf.put_byte(2);
278 buf.put_byte(3);
279
280 let contents = buf.consume_all();
281
282 let mut read_stream = FakeRead::new(contents);
283
284 let stream_memory = read_stream.reserve(1234);
285 assert!(stream_memory.capacity() >= 1234);
286
287 let mut payload = read_stream.read_exactly(3).await.unwrap();
288 assert_eq!(payload.len(), 3);
289
290 assert_eq!(payload.get_byte(), 1);
291 assert_eq!(payload.get_byte(), 2);
292 assert_eq!(payload.get_byte(), 3);
293
294 let final_read = read_stream.read_any().await.unwrap();
295 assert_eq!(final_read.len(), 0);
296 });
297 }
298
299 #[test]
300 fn with_max_read_size() {
301 async_test(async || {
302 let memory = GlobalPool::new();
303 let test_data = BytesView::copied_from_slice(b"Hello, world!", &memory);
304 let mut read_stream = FakeRead::builder().contents(test_data).max_read_size(nz!(2)).build();
305
306 let mut payload = read_stream.read_any().await.unwrap().consume_all();
308 assert_eq!(payload.len(), 2);
309 assert_eq!(payload.get_byte(), b'H');
310 assert_eq!(payload.get_byte(), b'e');
311
312 payload = read_stream.read_any().await.unwrap().consume_all();
314 assert_eq!(payload.len(), 2);
315 assert_eq!(payload.get_byte(), b'l');
316 assert_eq!(payload.get_byte(), b'l');
317
318 payload = read_stream.read_any().await.unwrap().consume_all();
320 assert_eq!(payload.len(), 2);
321 assert_eq!(payload.get_byte(), b'o');
322 assert_eq!(payload.get_byte(), b',');
323
324 payload = read_stream.read_any().await.unwrap().consume_all();
326 assert_eq!(payload.len(), 2);
327 assert_eq!(payload.get_byte(), b' ');
328 assert_eq!(payload.get_byte(), b'w');
329
330 payload = read_stream.read_any().await.unwrap().consume_all();
332 assert_eq!(payload.len(), 2);
333 assert_eq!(payload.get_byte(), b'o');
334 assert_eq!(payload.get_byte(), b'r');
335
336 payload = read_stream.read_any().await.unwrap().consume_all();
338 assert_eq!(payload.len(), 2);
339 assert_eq!(payload.get_byte(), b'l');
340 assert_eq!(payload.get_byte(), b'd');
341
342 payload = read_stream.read_any().await.unwrap().consume_all();
344 assert_eq!(payload.len(), 1);
345 assert_eq!(payload.get_byte(), b'!');
346 });
347 }
348
349 #[test]
350 fn read_more() {
351 async_test(async || {
352 let memory = GlobalPool::new();
353 let test_data = BytesView::copied_from_slice(b"Hello, world!", &memory);
354 let mut read_stream = FakeRead::builder().contents(test_data.clone()).max_read_size(nz!(2)).build();
355
356 let mut payload_buffer = read_stream.reserve(100);
357
358 loop {
359 let previous_len = payload_buffer.len();
360
361 let (bytes_read, new_payload_buffer) = read_stream.read_more_into(payload_buffer).await.unwrap();
362
363 payload_buffer = new_payload_buffer;
364
365 assert_eq!(previous_len + bytes_read, payload_buffer.len());
367
368 if bytes_read == 0 {
369 break;
370 }
371 }
372
373 assert_eq!(payload_buffer.len(), test_data.len());
374
375 let mut payload = payload_buffer.consume_all();
376 assert_eq!(payload.len(), test_data.len());
377 assert_eq!(payload.get_byte(), b'H');
378 assert_eq!(payload.get_byte(), b'e');
379 assert_eq!(payload.get_byte(), b'l');
380 assert_eq!(payload.get_byte(), b'l');
381 assert_eq!(payload.get_byte(), b'o');
382 assert_eq!(payload.get_byte(), b',');
383 assert_eq!(payload.get_byte(), b' ');
384 assert_eq!(payload.get_byte(), b'w');
385 assert_eq!(payload.get_byte(), b'o');
386 assert_eq!(payload.get_byte(), b'r');
387 assert_eq!(payload.get_byte(), b'l');
388 assert_eq!(payload.get_byte(), b'd');
389 assert_eq!(payload.get_byte(), b'!');
390 assert_eq!(payload.len(), 0);
391 });
392 }
393
394 #[test]
395 fn memory_returns_configured_provider() {
396 use std::sync::Arc;
397 use std::sync::atomic::{AtomicBool, Ordering};
398
399 use bytesbuf::mem::testing::TransparentMemory;
400 use bytesbuf::mem::{CallbackMemory, OpaqueMemory};
401
402 let callback_called = Arc::new(AtomicBool::new(false));
403
404 let custom_memory = OpaqueMemory::new(CallbackMemory::new({
405 let callback_called = Arc::clone(&callback_called);
406 move |min_bytes| {
407 callback_called.store(true, Ordering::SeqCst);
408 TransparentMemory::new().reserve(min_bytes)
409 }
410 }));
411
412 let memory = GlobalPool::new();
413 let contents = BytesView::copied_from_slice(b"test", &memory);
414 let read_stream = FakeRead::builder().contents(contents).memory(custom_memory).build();
415
416 let stream_memory = read_stream.memory();
418 let _buf = stream_memory.reserve(10);
419
420 assert!(
421 callback_called.load(Ordering::SeqCst),
422 "Custom memory callback should have been called"
423 );
424 }
425}