Skip to main content

bytesbuf_io/testing/
pending.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use std::convert::Infallible;
5use std::future;
6
7use bytesbuf::mem::testing::TransparentMemory;
8use bytesbuf::mem::{HasMemory, Memory, MemoryShared, OpaqueMemory};
9use bytesbuf::{BytesBuf, BytesView};
10
11use crate::{Read, Write};
12
13/// A [`Read`] and [`Write`] that never completes any reads or writes.
14///
15/// Intended for simple tests and examples that need a never-completing stream.
16#[derive(Debug)]
17pub struct Pending {
18    memory: OpaqueMemory,
19}
20
21impl Pending {
22    /// Starts building a new `PendingStream`.
23    #[must_use]
24    pub fn builder() -> PendingBuilder {
25        PendingBuilder {
26            memory: OpaqueMemory::new(TransparentMemory::new()),
27        }
28    }
29
30    /// Creates a new `PendingStream` with the default configuration.
31    #[must_use]
32    pub fn new() -> Self {
33        Self::builder().build()
34    }
35
36    /// Starts a read operation that never completes.
37    ///
38    /// # Errors
39    ///
40    /// This call never fails (because it never completes).
41    #[cfg_attr(coverage_nightly, coverage(off))] // Contains intentionally unreachable code.
42    #[expect(clippy::needless_pass_by_ref_mut, reason = "API compatibility between trait and inherent fn")]
43    pub async fn read_at_most_into(&mut self, _len: usize, _into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
44        future::pending::<()>().await;
45        unreachable!();
46    }
47
48    /// Starts a read operation that never completes.
49    ///
50    /// # Errors
51    ///
52    /// This call never fails (because it never completes).
53    #[cfg_attr(coverage_nightly, coverage(off))] // Contains intentionally unreachable code.
54    #[expect(clippy::needless_pass_by_ref_mut, reason = "API compatibility between trait and inherent fn")]
55    pub async fn read_more_into(&mut self, _into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
56        future::pending::<()>().await;
57        unreachable!();
58    }
59
60    /// Starts a read operation that never completes.
61    ///
62    /// # Errors
63    ///
64    /// This call never fails (because it never completes).
65    #[cfg_attr(coverage_nightly, coverage(off))] // Contains intentionally unreachable code.
66    #[expect(clippy::needless_pass_by_ref_mut, reason = "API compatibility between trait and inherent fn")]
67    pub async fn read_any(&mut self) -> Result<BytesBuf, Infallible> {
68        future::pending::<()>().await;
69        unreachable!();
70    }
71
72    /// Starts a write operation that never completes.
73    ///
74    /// # Errors
75    ///
76    /// This call never fails (because it never completes).
77    #[cfg_attr(coverage_nightly, coverage(off))] // Contains intentionally unreachable code.
78    #[expect(clippy::needless_pass_by_ref_mut, reason = "API compatibility between trait and inherent fn")]
79    pub async fn write(&mut self, _sequence: BytesView) -> Result<(), Infallible> {
80        future::pending::<()>().await;
81        unreachable!();
82    }
83
84    /// Returns the memory provider that was configured in the builder.
85    #[must_use]
86    pub fn memory(&self) -> impl MemoryShared {
87        self.memory.clone()
88    }
89
90    /// Reserves at least `min_bytes` bytes of memory capacity.
91    ///
92    /// Returns an empty [`BytesBuf`] that can be used to fill the reserved memory with data.
93    ///
94    /// The memory provider may provide more memory than requested.
95    ///
96    /// The memory reservation request will always be fulfilled, obtaining more memory from the
97    /// operating system if necessary.
98    ///
99    /// # Zero-sized reservations
100    ///
101    /// Reserving zero bytes of memory is a valid operation and will return a [`BytesBuf`]
102    /// with zero or more bytes of capacity.
103    ///
104    /// # Panics
105    ///
106    /// May panic if the operating system runs out of memory.
107    #[must_use]
108    pub fn reserve(&self, min_bytes: usize) -> BytesBuf {
109        self.memory.reserve(min_bytes)
110    }
111}
112
113impl Default for Pending {
114    fn default() -> Self {
115        Self::new()
116    }
117}
118
119#[cfg_attr(coverage_nightly, coverage(off))] // Trivial forwarder.
120impl Memory for Pending {
121    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
122    fn reserve(&self, min_bytes: usize) -> BytesBuf {
123        self.reserve(min_bytes)
124    }
125}
126
127#[cfg_attr(coverage_nightly, coverage(off))] // Trivial forwarder.
128impl HasMemory for Pending {
129    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
130    fn memory(&self) -> impl MemoryShared {
131        self.memory()
132    }
133}
134
135#[cfg_attr(coverage_nightly, coverage(off))] // Trivial forwarder.
136impl Read for Pending {
137    type Error = Infallible;
138
139    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
140    async fn read_at_most_into(&mut self, len: usize, into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
141        self.read_at_most_into(len, into).await
142    }
143
144    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
145    async fn read_more_into(&mut self, into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
146        self.read_more_into(into).await
147    }
148
149    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
150    async fn read_any(&mut self) -> Result<BytesBuf, Infallible> {
151        self.read_any().await
152    }
153}
154
155#[cfg_attr(coverage_nightly, coverage(off))] // Trivial forwarder.
156impl Write for Pending {
157    type Error = Infallible;
158
159    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
160    async fn write(&mut self, data: BytesView) -> Result<(), Infallible> {
161        self.write(data).await
162    }
163}
164
165/// Creates an instance of [`Pending`].
166///
167/// Access through [`Pending::builder()`][Pending::builder].
168#[derive(Debug)]
169pub struct PendingBuilder {
170    memory: OpaqueMemory,
171}
172
173impl PendingBuilder {
174    /// The memory provider to use in memory-related stream operations.
175    ///
176    /// The pending stream never reserves memory, so the only purpose of this is to allow the user
177    /// of the null stream to call `memory()` and `reserve()` via the `HasMemory` and `Memory`
178    /// traits that every stream implements.
179    ///
180    /// Optional. Defaults to using the Rust global allocator.
181    #[must_use]
182    pub fn memory(mut self, memory: OpaqueMemory) -> Self {
183        self.memory = memory;
184        self
185    }
186
187    /// Builds the `Pending` with the provided configuration.
188    #[must_use]
189    pub fn build(self) -> Pending {
190        Pending { memory: self.memory }
191    }
192}
193
194#[cfg(test)]
195#[cfg_attr(coverage_nightly, coverage(off))]
196mod tests {
197    use std::pin::pin;
198    use std::sync::Arc;
199    use std::sync::atomic::{AtomicBool, Ordering};
200    use std::task::{Context, Poll, Waker};
201
202    use bytesbuf::mem::CallbackMemory;
203
204    use super::*;
205
206    #[test]
207    fn smoke_test() {
208        let stream = Pending::new();
209
210        let reserved1 = stream.reserve(123);
211        assert!(reserved1.capacity() >= 123);
212
213        let memory = stream.memory();
214        let reserved2 = memory.reserve(123);
215        assert!(reserved2.capacity() >= 123);
216    }
217
218    #[test]
219    fn default_returns_working_instance() {
220        let stream = Pending::default();
221
222        // Verify it can reserve memory
223        let buffer = stream.reserve(100);
224        assert!(buffer.remaining_capacity() >= 100);
225
226        // Verify memory() works
227        let stream_memory = stream.memory();
228        let buffer2 = stream_memory.reserve(50);
229        assert!(buffer2.remaining_capacity() >= 50);
230    }
231
232    #[test]
233    fn memory_returns_configured_provider() {
234        let callback_called = Arc::new(AtomicBool::new(false));
235
236        let custom_memory = OpaqueMemory::new(CallbackMemory::new({
237            let callback_called = Arc::clone(&callback_called);
238            move |min_bytes| {
239                callback_called.store(true, Ordering::SeqCst);
240                TransparentMemory::new().reserve(min_bytes)
241            }
242        }));
243
244        let pending_stream = Pending::builder().memory(custom_memory).build();
245
246        // Get memory from stream and use it
247        let stream_memory = pending_stream.memory();
248        let _buf = stream_memory.reserve(10);
249
250        assert!(
251            callback_called.load(Ordering::SeqCst),
252            "Custom memory callback should have been called"
253        );
254    }
255
256    #[test]
257    fn read_at_most_into_returns_pending_on_first_poll() {
258        let mut stream = Pending::new();
259        let buffer = BytesBuf::new();
260
261        let mut future = pin!(stream.read_at_most_into(100, buffer));
262        let waker = Waker::noop();
263        let mut cx = Context::from_waker(waker);
264
265        let result = future.as_mut().poll(&mut cx);
266        assert!(
267            matches!(result, Poll::Pending),
268            "read_at_most_into should return Pending on first poll"
269        );
270    }
271
272    #[test]
273    fn read_more_into_returns_pending_on_first_poll() {
274        let mut stream = Pending::new();
275        let buffer = BytesBuf::new();
276
277        let mut future = pin!(stream.read_more_into(buffer));
278        let waker = Waker::noop();
279        let mut cx = Context::from_waker(waker);
280
281        let result = future.as_mut().poll(&mut cx);
282        assert!(
283            matches!(result, Poll::Pending),
284            "read_more_into should return Pending on first poll"
285        );
286    }
287
288    #[test]
289    fn read_any_returns_pending_on_first_poll() {
290        let mut stream = Pending::new();
291
292        let mut future = pin!(stream.read_any());
293        let waker = Waker::noop();
294        let mut cx = Context::from_waker(waker);
295
296        let result = future.as_mut().poll(&mut cx);
297        assert!(matches!(result, Poll::Pending), "read_any should return Pending on first poll");
298    }
299
300    #[test]
301    fn write_returns_pending_on_first_poll() {
302        let mut stream = Pending::new();
303        let data = BytesView::default();
304
305        let mut future = pin!(stream.write(data));
306        let waker = Waker::noop();
307        let mut cx = Context::from_waker(waker);
308
309        let result = future.as_mut().poll(&mut cx);
310        assert!(matches!(result, Poll::Pending), "write should return Pending on first poll");
311    }
312}