bytesbuf_io/testing/
pending.rs1use std::convert::Infallible;
5use std::future;
6
7use bytesbuf::mem::testing::TransparentMemory;
8use bytesbuf::mem::{HasMemory, Memory, MemoryShared, OpaqueMemory};
9use bytesbuf::{BytesBuf, BytesView};
10
11use crate::{Read, Write};
12
13#[derive(Debug)]
17pub struct Pending {
18 memory: OpaqueMemory,
19}
20
21impl Pending {
22 #[must_use]
24 pub fn builder() -> PendingBuilder {
25 PendingBuilder {
26 memory: OpaqueMemory::new(TransparentMemory::new()),
27 }
28 }
29
30 #[must_use]
32 pub fn new() -> Self {
33 Self::builder().build()
34 }
35
36 #[cfg_attr(coverage_nightly, coverage(off))] #[expect(clippy::needless_pass_by_ref_mut, reason = "API compatibility between trait and inherent fn")]
43 pub async fn read_at_most_into(&mut self, _len: usize, _into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
44 future::pending::<()>().await;
45 unreachable!();
46 }
47
48 #[cfg_attr(coverage_nightly, coverage(off))] #[expect(clippy::needless_pass_by_ref_mut, reason = "API compatibility between trait and inherent fn")]
55 pub async fn read_more_into(&mut self, _into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
56 future::pending::<()>().await;
57 unreachable!();
58 }
59
60 #[cfg_attr(coverage_nightly, coverage(off))] #[expect(clippy::needless_pass_by_ref_mut, reason = "API compatibility between trait and inherent fn")]
67 pub async fn read_any(&mut self) -> Result<BytesBuf, Infallible> {
68 future::pending::<()>().await;
69 unreachable!();
70 }
71
72 #[cfg_attr(coverage_nightly, coverage(off))] #[expect(clippy::needless_pass_by_ref_mut, reason = "API compatibility between trait and inherent fn")]
79 pub async fn write(&mut self, _sequence: BytesView) -> Result<(), Infallible> {
80 future::pending::<()>().await;
81 unreachable!();
82 }
83
84 #[must_use]
86 pub fn memory(&self) -> impl MemoryShared {
87 self.memory.clone()
88 }
89
90 #[must_use]
108 pub fn reserve(&self, min_bytes: usize) -> BytesBuf {
109 self.memory.reserve(min_bytes)
110 }
111}
112
113impl Default for Pending {
114 fn default() -> Self {
115 Self::new()
116 }
117}
118
119#[cfg_attr(coverage_nightly, coverage(off))] impl Memory for Pending {
121 #[cfg_attr(test, mutants::skip)] fn reserve(&self, min_bytes: usize) -> BytesBuf {
123 self.reserve(min_bytes)
124 }
125}
126
127#[cfg_attr(coverage_nightly, coverage(off))] impl HasMemory for Pending {
129 #[cfg_attr(test, mutants::skip)] fn memory(&self) -> impl MemoryShared {
131 self.memory()
132 }
133}
134
135#[cfg_attr(coverage_nightly, coverage(off))] impl Read for Pending {
137 type Error = Infallible;
138
139 #[cfg_attr(test, mutants::skip)] async fn read_at_most_into(&mut self, len: usize, into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
141 self.read_at_most_into(len, into).await
142 }
143
144 #[cfg_attr(test, mutants::skip)] async fn read_more_into(&mut self, into: BytesBuf) -> Result<(usize, BytesBuf), Infallible> {
146 self.read_more_into(into).await
147 }
148
149 #[cfg_attr(test, mutants::skip)] async fn read_any(&mut self) -> Result<BytesBuf, Infallible> {
151 self.read_any().await
152 }
153}
154
155#[cfg_attr(coverage_nightly, coverage(off))] impl Write for Pending {
157 type Error = Infallible;
158
159 #[cfg_attr(test, mutants::skip)] async fn write(&mut self, data: BytesView) -> Result<(), Infallible> {
161 self.write(data).await
162 }
163}
164
165#[derive(Debug)]
169pub struct PendingBuilder {
170 memory: OpaqueMemory,
171}
172
173impl PendingBuilder {
174 #[must_use]
182 pub fn memory(mut self, memory: OpaqueMemory) -> Self {
183 self.memory = memory;
184 self
185 }
186
187 #[must_use]
189 pub fn build(self) -> Pending {
190 Pending { memory: self.memory }
191 }
192}
193
194#[cfg(test)]
195#[cfg_attr(coverage_nightly, coverage(off))]
196mod tests {
197 use std::pin::pin;
198 use std::sync::Arc;
199 use std::sync::atomic::{AtomicBool, Ordering};
200 use std::task::{Context, Poll, Waker};
201
202 use bytesbuf::mem::CallbackMemory;
203
204 use super::*;
205
206 #[test]
207 fn smoke_test() {
208 let stream = Pending::new();
209
210 let reserved1 = stream.reserve(123);
211 assert!(reserved1.capacity() >= 123);
212
213 let memory = stream.memory();
214 let reserved2 = memory.reserve(123);
215 assert!(reserved2.capacity() >= 123);
216 }
217
218 #[test]
219 fn default_returns_working_instance() {
220 let stream = Pending::default();
221
222 let buffer = stream.reserve(100);
224 assert!(buffer.remaining_capacity() >= 100);
225
226 let stream_memory = stream.memory();
228 let buffer2 = stream_memory.reserve(50);
229 assert!(buffer2.remaining_capacity() >= 50);
230 }
231
232 #[test]
233 fn memory_returns_configured_provider() {
234 let callback_called = Arc::new(AtomicBool::new(false));
235
236 let custom_memory = OpaqueMemory::new(CallbackMemory::new({
237 let callback_called = Arc::clone(&callback_called);
238 move |min_bytes| {
239 callback_called.store(true, Ordering::SeqCst);
240 TransparentMemory::new().reserve(min_bytes)
241 }
242 }));
243
244 let pending_stream = Pending::builder().memory(custom_memory).build();
245
246 let stream_memory = pending_stream.memory();
248 let _buf = stream_memory.reserve(10);
249
250 assert!(
251 callback_called.load(Ordering::SeqCst),
252 "Custom memory callback should have been called"
253 );
254 }
255
256 #[test]
257 fn read_at_most_into_returns_pending_on_first_poll() {
258 let mut stream = Pending::new();
259 let buffer = BytesBuf::new();
260
261 let mut future = pin!(stream.read_at_most_into(100, buffer));
262 let waker = Waker::noop();
263 let mut cx = Context::from_waker(waker);
264
265 let result = future.as_mut().poll(&mut cx);
266 assert!(
267 matches!(result, Poll::Pending),
268 "read_at_most_into should return Pending on first poll"
269 );
270 }
271
272 #[test]
273 fn read_more_into_returns_pending_on_first_poll() {
274 let mut stream = Pending::new();
275 let buffer = BytesBuf::new();
276
277 let mut future = pin!(stream.read_more_into(buffer));
278 let waker = Waker::noop();
279 let mut cx = Context::from_waker(waker);
280
281 let result = future.as_mut().poll(&mut cx);
282 assert!(
283 matches!(result, Poll::Pending),
284 "read_more_into should return Pending on first poll"
285 );
286 }
287
288 #[test]
289 fn read_any_returns_pending_on_first_poll() {
290 let mut stream = Pending::new();
291
292 let mut future = pin!(stream.read_any());
293 let waker = Waker::noop();
294 let mut cx = Context::from_waker(waker);
295
296 let result = future.as_mut().poll(&mut cx);
297 assert!(matches!(result, Poll::Pending), "read_any should return Pending on first poll");
298 }
299
300 #[test]
301 fn write_returns_pending_on_first_poll() {
302 let mut stream = Pending::new();
303 let data = BytesView::default();
304
305 let mut future = pin!(stream.write(data));
306 let waker = Waker::noop();
307 let mut cx = Context::from_waker(waker);
308
309 let result = future.as_mut().poll(&mut cx);
310 assert!(matches!(result, Poll::Pending), "write should return Pending on first poll");
311 }
312}