1#[cfg(doctest)]
26#[doc = include_str!("../README.md")]
27struct ReadMe;
28
29use std::{
30 collections::VecDeque,
31 io,
32 sync::{Arc, Condvar, Mutex, MutexGuard},
33 time::Duration,
34};
35
36struct SyncBuffer {
38 data: Mutex<VecDeque<u8>>,
39 can_read: Condvar,
40 can_write: Condvar,
41}
42
43impl SyncBuffer {
44 fn new(capacity: usize) -> Self {
46 SyncBuffer {
47 data: Mutex::new(VecDeque::with_capacity(capacity)),
48 can_read: Condvar::new(),
49 can_write: Condvar::new(),
50 }
51 }
52
53 fn wait_while<'a, F>(
59 mut data_guard: MutexGuard<'a, VecDeque<u8>>,
60 condvar: &Condvar,
61 timeout: Option<Duration>,
62 condition: F,
63 ) -> io::Result<MutexGuard<'a, VecDeque<u8>>>
64 where
65 F: Fn(&mut VecDeque<u8>) -> bool,
66 {
67 if condition(&mut data_guard) {
68 data_guard = match timeout {
69 Some(Duration::ZERO) => data_guard,
70 Some(timeout) => {
71 let (new_guard, timeout_result) = condvar
72 .wait_timeout_while(data_guard, timeout, condition)
73 .map_err(|_| io::Error::from(io::ErrorKind::Other))?;
74
75 if timeout_result.timed_out() {
76 return Err(io::Error::from(io::ErrorKind::TimedOut));
77 }
78
79 new_guard
80 }
81 None => condvar
82 .wait_while(data_guard, condition)
83 .map_err(|_| io::Error::from(io::ErrorKind::Other))?,
84 };
85 }
86
87 Ok(data_guard)
88 }
89
90 fn wait_for_bytes_available<F>(
97 &self,
98 bytes_required: usize,
99 condvar: &Condvar,
100 timeout: Option<Duration>,
101 get_bytes_available: F,
102 ) -> io::Result<(MutexGuard<VecDeque<u8>>, usize)>
103 where
104 F: Fn(&VecDeque<u8>) -> usize,
105 {
106 let mut data_guard = self.data.lock().unwrap();
107
108 if (bytes_required == 0) || (data_guard.capacity() == 0) {
109 return Ok((data_guard, 0));
110 }
111
112 data_guard = Self::wait_while(data_guard, condvar, timeout, |data| {
113 get_bytes_available(data) == 0
114 })?;
115
116 let bytes_available = bytes_required.min(get_bytes_available(&data_guard));
117
118 Ok((data_guard, bytes_available))
119 }
120
121 fn read(&self, buf: &mut [u8], timeout: Option<Duration>) -> io::Result<usize> {
126 let (mut data_guard, bytes_to_read) =
127 self.wait_for_bytes_available(buf.len(), &self.can_read, timeout, |guard| guard.len())?;
128
129 if bytes_to_read > 0 {
130 for byte in &mut buf[0..bytes_to_read] {
131 *byte = data_guard.pop_front().unwrap();
132 }
133
134 self.can_write.notify_one();
136 }
137
138 Ok(bytes_to_read)
139 }
140
141 fn write(&self, buf: &[u8], timeout: Option<Duration>) -> io::Result<usize> {
146 let (mut data_guard, bytes_to_write) =
147 self.wait_for_bytes_available(buf.len(), &self.can_write, timeout, |guard| {
148 guard.capacity() - guard.len()
149 })?;
150
151 if bytes_to_write > 0 {
152 data_guard.extend(&buf[0..bytes_to_write]);
153
154 self.can_read.notify_one();
156 }
157
158 Ok(bytes_to_write)
159 }
160
161 fn flush(&self, timeout: Option<Duration>) -> io::Result<()> {
164 Self::wait_while(
166 self.data.lock().unwrap(),
167 &self.can_write,
168 timeout,
169 |data| !data.is_empty(),
170 )
171 .map(|_| ())
172 }
173
174 fn clear(&self) {
176 self.data.lock().unwrap().clear();
177 self.can_write.notify_all();
178 }
179
180 fn len(&self) -> usize {
182 self.data.lock().unwrap().len()
183 }
184}
185
186#[derive(Clone)]
194pub struct MockPipe {
195 timeout: Arc<Mutex<Option<Duration>>>,
201
202 read_buffer: Arc<SyncBuffer>,
204
205 write_buffer: Arc<SyncBuffer>,
207}
208
209impl MockPipe {
210 fn from_buffers(read_buffer: Arc<SyncBuffer>, write_buffer: Arc<SyncBuffer>) -> Self {
212 Self {
213 timeout: Arc::new(Mutex::new(Some(Duration::ZERO))),
215 read_buffer,
216 write_buffer,
217 }
218 }
219
220 pub fn loopback(buffer_capacity: usize) -> Self {
225 let buffer = Arc::new(SyncBuffer::new(buffer_capacity));
226 Self::from_buffers(buffer.clone(), buffer)
227 }
228
229 pub fn pair(buffer_capacity: usize) -> (Self, Self) {
233 let buffer1 = Arc::new(SyncBuffer::new(buffer_capacity));
234 let buffer2 = Arc::new(SyncBuffer::new(buffer_capacity));
235
236 let pipe1 = Self::from_buffers(buffer1.clone(), buffer2.clone());
237 let pipe2 = Self::from_buffers(buffer2, buffer1);
238
239 (pipe1, pipe2)
240 }
241
242 pub fn timeout(&self) -> Option<Duration> {
244 *self.timeout.lock().unwrap()
245 }
246
247 pub fn set_timeout(&self, timeout: Option<Duration>) {
252 *self.timeout.lock().unwrap() = timeout;
253 }
254
255 pub fn with_timeout(self, timeout: Option<Duration>) -> Self {
258 self.set_timeout(timeout);
259 self
260 }
261
262 pub fn read_buffer_len(&self) -> usize {
264 self.read_buffer.len()
265 }
266
267 pub fn write_buffer_len(&self) -> usize {
269 self.write_buffer.len()
270 }
271
272 pub fn clear_read(&self) {
274 self.read_buffer.clear();
275 }
276
277 pub fn clear_write(&self) {
279 self.write_buffer.clear();
280 }
281
282 pub fn clear(&self) {
284 self.clear_read();
285 self.clear_write();
286 }
287}
288
289impl io::Read for MockPipe {
290 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
291 self.read_buffer.read(buf, self.timeout())
292 }
293}
294
295impl io::Write for MockPipe {
296 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
297 self.write_buffer.write(buf, self.timeout())
298 }
299
300 fn flush(&mut self) -> io::Result<()> {
301 self.write_buffer.flush(None)
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use std::io::{Read, Write};
308
309 use super::*;
310
311 #[test]
312 fn test_loopback() {
313 let mut pipe = MockPipe::loopback(1024);
314
315 for _ in 0..1 {
317 pipe.write_all(b"").unwrap();
318 pipe.write_all(b"").unwrap();
319
320 pipe.read_exact(&mut []).unwrap();
321
322 let write_data = b"hello";
323 pipe.write_all(write_data).unwrap();
324
325 pipe.read_exact(&mut []).unwrap();
326 pipe.read_exact(&mut []).unwrap();
327
328 pipe.write_all(b"").unwrap();
329
330 pipe.read_exact(&mut []).unwrap();
331
332 let mut read_data = [0u8; 5];
333 pipe.read_exact(&mut read_data).unwrap();
334
335 pipe.write_all(b"").unwrap();
336
337 assert_eq!(&read_data, write_data);
338
339 pipe.set_timeout(Some(Duration::from_millis(100)));
341 }
342 }
343
344 #[test]
345 fn test_pair() {
346 let (mut pipe1, mut pipe2) = MockPipe::pair(1024);
347
348 let write_data = b"hello";
349 pipe1.write_all(write_data).unwrap();
350
351 let mut read_data = [0u8; 5];
352 pipe2.read_exact(&mut read_data).unwrap();
353
354 assert_eq!(&read_data, write_data);
355 }
356
357 #[test]
358 fn test_bidirectional_exchange() {
359 let (mut pipe1, mut pipe2) = MockPipe::pair(1024);
360
361 let write_data11 = b"hello";
362 pipe1.write_all(write_data11).unwrap();
363
364 assert_eq!(pipe1.write_buffer_len(), 5);
365 assert_eq!(pipe1.read_buffer_len(), 0);
366 assert_eq!(pipe2.write_buffer_len(), 0);
367 assert_eq!(pipe2.read_buffer_len(), 5);
368
369 let write_data2 = b"ok";
370 pipe2.write_all(write_data2).unwrap();
371
372 assert_eq!(pipe1.write_buffer_len(), 5);
373 assert_eq!(pipe1.read_buffer_len(), 2);
374 assert_eq!(pipe2.write_buffer_len(), 2);
375 assert_eq!(pipe2.read_buffer_len(), 5);
376
377 let write_data12 = b"world";
378 pipe1.write_all(write_data12).unwrap();
379
380 assert_eq!(pipe1.write_buffer_len(), 10);
381 assert_eq!(pipe1.read_buffer_len(), 2);
382 assert_eq!(pipe2.write_buffer_len(), 2);
383 assert_eq!(pipe2.read_buffer_len(), 10);
384
385 let mut read_data1 = [0u8; 1];
388 pipe1.read_exact(&mut read_data1).unwrap();
389
390 let mut read_data2 = [0u8; 7];
391 pipe2.read_exact(&mut read_data2).unwrap();
392
393 assert_eq!(pipe1.write_buffer_len(), 3);
394 assert_eq!(pipe1.read_buffer_len(), 1);
395 assert_eq!(pipe2.write_buffer_len(), 1);
396 assert_eq!(pipe2.read_buffer_len(), 3);
397
398 assert_eq!(&read_data1, b"o");
399 assert_eq!(&read_data2, b"hellowo");
400 }
401
402 #[test]
403 fn test_zero_capacity_buffer() {
404 let mut pipe = MockPipe::loopback(0);
405
406 for _ in 0..1 {
408 pipe.write_all(b"").unwrap();
409
410 assert_eq!(
412 pipe.write_all(b"hello").unwrap_err().kind(),
413 io::ErrorKind::WriteZero
414 );
415
416 pipe.read_exact(&mut []).unwrap();
417
418 let mut read_data = [0u8; 5];
420 assert_eq!(
421 pipe.read_exact(&mut read_data).unwrap_err().kind(),
422 io::ErrorKind::UnexpectedEof
423 );
424
425 pipe.set_timeout(Some(Duration::from_millis(100)));
427 }
428 }
429
430 #[test]
431 fn test_timeout_write() {
432 let mut pipe = MockPipe::loopback(5).with_timeout(Some(Duration::from_millis(100)));
434
435 let mut read_data = [0u8; 5];
437 assert_eq!(
438 pipe.read_exact(&mut read_data).unwrap_err().kind(),
439 io::ErrorKind::TimedOut
440 );
441
442 pipe.write_all(b"hello").unwrap();
444
445 assert_eq!(
447 pipe.write_all(b"!").unwrap_err().kind(),
448 io::ErrorKind::TimedOut
449 );
450 }
451
452 #[test]
453 fn test_buffer_clearing() {
454 let mut pipe = MockPipe::loopback(1024);
455
456 pipe.write_all(b"test").unwrap();
457
458 assert_eq!(pipe.write_buffer_len(), 4);
459 assert_eq!(pipe.read_buffer_len(), 4);
460
461 pipe.clear();
462
463 assert_eq!(pipe.write_buffer_len(), 0);
464 assert_eq!(pipe.read_buffer_len(), 0);
465
466 let mut read_data = [0u8; 1];
468 assert_eq!(
469 pipe.read_exact(&mut read_data).unwrap_err().kind(),
470 io::ErrorKind::UnexpectedEof
471 );
472 }
473
474 #[test]
475 fn test_multiple_threads() {
476 use std::{thread, time};
477
478 let (mut pipe1, mut pipe2) = MockPipe::pair(1024);
479
480 let write_data1 = b"hello";
481 let write_data2 = b"hi";
482
483 let writer = thread::spawn(move || {
484 thread::sleep(time::Duration::from_millis(100));
485
486 pipe1.write_all(write_data1).unwrap();
487 assert_eq!(pipe1.write_buffer_len(), write_data1.len());
488
489 thread::sleep(time::Duration::from_millis(100));
490
491 pipe1.write_all(write_data2).unwrap();
492 assert_eq!(pipe1.write_buffer_len(), write_data2.len());
493
494 pipe1.flush().unwrap();
495 assert_eq!(pipe1.write_buffer_len(), 0);
496 });
497
498 let reader = thread::spawn(move || {
499 pipe2.set_timeout(Some(Duration::from_millis(1000)));
500
501 let mut read_data = [0u8; 5];
502 pipe2.read_exact(&mut read_data).unwrap();
503 assert_eq!(&read_data, write_data1);
504
505 thread::sleep(time::Duration::from_millis(200));
506
507 pipe2.set_timeout(Some(Duration::ZERO));
508
509 let mut read_data = [0u8; 2];
510 pipe2.read_exact(&mut read_data).unwrap();
511 assert_eq!(&read_data, write_data2);
512 });
513
514 writer.join().unwrap();
515 reader.join().unwrap();
516 }
517}