mockpipe/
lib.rs

1//! Provides the `MockPipe` struct for exchanging data through internal circular
2//! buffers. It supports reading and writing with optional timeout functionality
3//! and is useful for testing communication mechanisms like sockets, pipes,
4//! serial ports etc.
5//
6//! # Example
7//!
8//! ```
9//! use std::io::{Read, Write};
10//!
11//! use mockpipe::MockPipe;
12//!
13//! let mut pipe = MockPipe::loopback(1024);
14//!
15//! let write_data = b"hello";
16//! pipe.write_all(write_data).unwrap();
17//!
18//! let mut read_data = [0u8; 5];
19//! pipe.read_exact(&mut read_data).unwrap();
20//!
21//! assert_eq!(&read_data, write_data);
22//! ```
23
24// To run doc tests on examples from README.md and verify their correctness
25#[cfg(doctest)]
26#[doc = include_str!("../README.md")]
27struct ReadMe;
28
29use std::{
30    collections::VecDeque,
31    io,
32    sync::{Arc, Condvar, Mutex, MutexGuard},
33    time::Duration,
34};
35
36/// A thread-safe circular buffer with synchronization primitives.
37struct SyncBuffer {
38    data: Mutex<VecDeque<u8>>,
39    can_read: Condvar,
40    can_write: Condvar,
41}
42
43impl SyncBuffer {
44    /// Creates a new `SyncBuffer` with the specified capacity.
45    fn new(capacity: usize) -> Self {
46        SyncBuffer {
47            data: Mutex::new(VecDeque::with_capacity(capacity)),
48            can_read: Condvar::new(),
49            can_write: Condvar::new(),
50        }
51    }
52
53    /// Waits until the condition function returns false.
54    ///
55    /// If successful, returns a new locked guard to the data buffer.
56    /// If a timeout is specified, returns a `TimedOut` error if the condition
57    /// is not met within the timeout duration.
58    fn wait_while<'a, F>(
59        mut data_guard: MutexGuard<'a, VecDeque<u8>>,
60        condvar: &Condvar,
61        timeout: Option<Duration>,
62        condition: F,
63    ) -> io::Result<MutexGuard<'a, VecDeque<u8>>>
64    where
65        F: Fn(&mut VecDeque<u8>) -> bool,
66    {
67        if condition(&mut data_guard) {
68            data_guard = match timeout {
69                Some(Duration::ZERO) => data_guard,
70                Some(timeout) => {
71                    let (new_guard, timeout_result) = condvar
72                        .wait_timeout_while(data_guard, timeout, condition)
73                        .map_err(|_| io::Error::from(io::ErrorKind::Other))?;
74
75                    if timeout_result.timed_out() {
76                        return Err(io::Error::from(io::ErrorKind::TimedOut));
77                    }
78
79                    new_guard
80                }
81                None => condvar
82                    .wait_while(data_guard, condition)
83                    .map_err(|_| io::Error::from(io::ErrorKind::Other))?,
84            };
85        }
86
87        Ok(data_guard)
88    }
89
90    /// Waits until the required number of bytes are available in the buffer for
91    /// reading or writing.
92    ///
93    /// If successful, returns a locked data guard and the number of bytes available.
94    /// If a timeout is specified, returns a `TimedOut` error if the required bytes
95    /// are not available within the timeout duration.
96    fn wait_for_bytes_available<F>(
97        &self,
98        bytes_required: usize,
99        condvar: &Condvar,
100        timeout: Option<Duration>,
101        get_bytes_available: F,
102    ) -> io::Result<(MutexGuard<VecDeque<u8>>, usize)>
103    where
104        F: Fn(&VecDeque<u8>) -> usize,
105    {
106        let mut data_guard = self.data.lock().unwrap();
107
108        if (bytes_required == 0) || (data_guard.capacity() == 0) {
109            return Ok((data_guard, 0));
110        }
111
112        data_guard = Self::wait_while(data_guard, condvar, timeout, |data| {
113            get_bytes_available(data) == 0
114        })?;
115
116        let bytes_available = bytes_required.min(get_bytes_available(&data_guard));
117
118        Ok((data_guard, bytes_available))
119    }
120
121    /// Reads data from the buffer.
122    ///
123    /// Blocks until the specified amount of data is available or the timeout is reached.
124    /// Returns the number of bytes read if successful.
125    fn read(&self, buf: &mut [u8], timeout: Option<Duration>) -> io::Result<usize> {
126        let (mut data_guard, bytes_to_read) =
127            self.wait_for_bytes_available(buf.len(), &self.can_read, timeout, |guard| guard.len())?;
128
129        if bytes_to_read > 0 {
130            for byte in &mut buf[0..bytes_to_read] {
131                *byte = data_guard.pop_front().unwrap();
132            }
133
134            // Notify the writer that space is available
135            self.can_write.notify_one();
136        }
137
138        Ok(bytes_to_read)
139    }
140
141    /// Writes data into the buffer.
142    ///
143    /// Blocks if there is not enough space until some space becomes available
144    /// or the timeout is reached. Returns the number of bytes written if successful.
145    fn write(&self, buf: &[u8], timeout: Option<Duration>) -> io::Result<usize> {
146        let (mut data_guard, bytes_to_write) =
147            self.wait_for_bytes_available(buf.len(), &self.can_write, timeout, |guard| {
148                guard.capacity() - guard.len()
149            })?;
150
151        if bytes_to_write > 0 {
152            data_guard.extend(&buf[0..bytes_to_write]);
153
154            // Notify the reader that data is available
155            self.can_read.notify_one();
156        }
157
158        Ok(bytes_to_write)
159    }
160
161    /// Waits until all data has been written from the buffer (blocks until the buffer is empty
162    /// or the operation times out, if a timeout is specified).
163    fn flush(&self, timeout: Option<Duration>) -> io::Result<()> {
164        // Wait until the write buffer is empty.
165        Self::wait_while(
166            self.data.lock().unwrap(),
167            &self.can_write,
168            timeout,
169            |data| !data.is_empty(),
170        )
171        .map(|_| ())
172    }
173
174    /// Clears the buffer, discarding all pending data and notifying waiting writers.
175    fn clear(&self) {
176        self.data.lock().unwrap().clear();
177        self.can_write.notify_all();
178    }
179
180    /// Returns the number of bytes available to read.
181    fn len(&self) -> usize {
182        self.data.lock().unwrap().len()
183    }
184}
185
186/// A bidirectional data pipe that exchanges datausing internal circular buffers.
187/// It provides functionality for reading and writing data with timeout support.
188/// Can be used in loopback mode or as a paired connection between two endpoints.
189///
190/// This structure is intended for implementing virtual sockets, pipes, serial
191/// ports, or similar communication mechanisms, abstracting away the details of
192/// buffer management and synchronization.
193#[derive(Clone)]
194pub struct MockPipe {
195    /// Timeout duration for read and write operations.
196    ///
197    /// - `None` means the operation blocks indefinitely.
198    /// - `Some(Duration::ZERO)` means the operation is non-blocking.
199    /// - `Some(Duration)` sets a specific timeout duration.
200    timeout: Arc<Mutex<Option<Duration>>>,
201
202    /// Buffer used for reading data.
203    read_buffer: Arc<SyncBuffer>,
204
205    /// Buffer used for writing data.
206    write_buffer: Arc<SyncBuffer>,
207}
208
209impl MockPipe {
210    /// Creates a `MockPipe` instance from separate read and write buffers.
211    fn from_buffers(read_buffer: Arc<SyncBuffer>, write_buffer: Arc<SyncBuffer>) -> Self {
212        Self {
213            // Non-blocking by default
214            timeout: Arc::new(Mutex::new(Some(Duration::ZERO))),
215            read_buffer,
216            write_buffer,
217        }
218    }
219
220    /// Creates a `MockPipe` in loopback mode, where the same buffer is used
221    /// for both reading and writing. This is useful for simulating a simple
222    /// communication scenario where data written to the pipe can be immediately
223    /// read back.
224    pub fn loopback(buffer_capacity: usize) -> Self {
225        let buffer = Arc::new(SyncBuffer::new(buffer_capacity));
226        Self::from_buffers(buffer.clone(), buffer)
227    }
228
229    /// Creates a linked pair of `MockPipe` instances, allowing data written
230    /// to one pipe to be read from the other. This simulates a full-duplex
231    /// communication channel between two endpoints.
232    pub fn pair(buffer_capacity: usize) -> (Self, Self) {
233        let buffer1 = Arc::new(SyncBuffer::new(buffer_capacity));
234        let buffer2 = Arc::new(SyncBuffer::new(buffer_capacity));
235
236        let pipe1 = Self::from_buffers(buffer1.clone(), buffer2.clone());
237        let pipe2 = Self::from_buffers(buffer2, buffer1);
238
239        (pipe1, pipe2)
240    }
241
242    /// Gets the current timeout duration for read/write operations.
243    pub fn timeout(&self) -> Option<Duration> {
244        *self.timeout.lock().unwrap()
245    }
246
247    /// Sets the timeout duration for read/write operations.
248    ///
249    /// `None` means the operation blocks indefinitely. `Some(Duration::ZERO)` means
250    /// the operation is non-blocking.
251    pub fn set_timeout(&self, timeout: Option<Duration>) {
252        *self.timeout.lock().unwrap() = timeout;
253    }
254
255    /// Sets the timeout duration for read/write operations and returns the modified
256    /// `MockPipe`.
257    pub fn with_timeout(self, timeout: Option<Duration>) -> Self {
258        self.set_timeout(timeout);
259        self
260    }
261
262    /// Returns the number of bytes currently available to read from the buffer.
263    pub fn read_buffer_len(&self) -> usize {
264        self.read_buffer.len()
265    }
266
267    /// Returns the number of bytes currently queued to write in the buffer.
268    pub fn write_buffer_len(&self) -> usize {
269        self.write_buffer.len()
270    }
271
272    /// Clears the read buffer, discarding all pending data.
273    pub fn clear_read(&self) {
274        self.read_buffer.clear();
275    }
276
277    /// Clears the write buffer, discarding all pending data.
278    pub fn clear_write(&self) {
279        self.write_buffer.clear();
280    }
281
282    /// Clears both read and write buffers, discarding all pending data.
283    pub fn clear(&self) {
284        self.clear_read();
285        self.clear_write();
286    }
287}
288
289impl io::Read for MockPipe {
290    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
291        self.read_buffer.read(buf, self.timeout())
292    }
293}
294
295impl io::Write for MockPipe {
296    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
297        self.write_buffer.write(buf, self.timeout())
298    }
299
300    fn flush(&mut self) -> io::Result<()> {
301        self.write_buffer.flush(None)
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use std::io::{Read, Write};
308
309    use super::*;
310
311    #[test]
312    fn test_loopback() {
313        let mut pipe = MockPipe::loopback(1024);
314
315        // Two test passes: without and with timeout
316        for _ in 0..1 {
317            pipe.write_all(b"").unwrap();
318            pipe.write_all(b"").unwrap();
319
320            pipe.read_exact(&mut []).unwrap();
321
322            let write_data = b"hello";
323            pipe.write_all(write_data).unwrap();
324
325            pipe.read_exact(&mut []).unwrap();
326            pipe.read_exact(&mut []).unwrap();
327
328            pipe.write_all(b"").unwrap();
329
330            pipe.read_exact(&mut []).unwrap();
331
332            let mut read_data = [0u8; 5];
333            pipe.read_exact(&mut read_data).unwrap();
334
335            pipe.write_all(b"").unwrap();
336
337            assert_eq!(&read_data, write_data);
338
339            // Set a timeout for the next pass
340            pipe.set_timeout(Some(Duration::from_millis(100)));
341        }
342    }
343
344    #[test]
345    fn test_pair() {
346        let (mut pipe1, mut pipe2) = MockPipe::pair(1024);
347
348        let write_data = b"hello";
349        pipe1.write_all(write_data).unwrap();
350
351        let mut read_data = [0u8; 5];
352        pipe2.read_exact(&mut read_data).unwrap();
353
354        assert_eq!(&read_data, write_data);
355    }
356
357    #[test]
358    fn test_bidirectional_exchange() {
359        let (mut pipe1, mut pipe2) = MockPipe::pair(1024);
360
361        let write_data11 = b"hello";
362        pipe1.write_all(write_data11).unwrap();
363
364        assert_eq!(pipe1.write_buffer_len(), 5);
365        assert_eq!(pipe1.read_buffer_len(), 0);
366        assert_eq!(pipe2.write_buffer_len(), 0);
367        assert_eq!(pipe2.read_buffer_len(), 5);
368
369        let write_data2 = b"ok";
370        pipe2.write_all(write_data2).unwrap();
371
372        assert_eq!(pipe1.write_buffer_len(), 5);
373        assert_eq!(pipe1.read_buffer_len(), 2);
374        assert_eq!(pipe2.write_buffer_len(), 2);
375        assert_eq!(pipe2.read_buffer_len(), 5);
376
377        let write_data12 = b"world";
378        pipe1.write_all(write_data12).unwrap();
379
380        assert_eq!(pipe1.write_buffer_len(), 10);
381        assert_eq!(pipe1.read_buffer_len(), 2);
382        assert_eq!(pipe2.write_buffer_len(), 2);
383        assert_eq!(pipe2.read_buffer_len(), 10);
384
385        // Partial reads
386
387        let mut read_data1 = [0u8; 1];
388        pipe1.read_exact(&mut read_data1).unwrap();
389
390        let mut read_data2 = [0u8; 7];
391        pipe2.read_exact(&mut read_data2).unwrap();
392
393        assert_eq!(pipe1.write_buffer_len(), 3);
394        assert_eq!(pipe1.read_buffer_len(), 1);
395        assert_eq!(pipe2.write_buffer_len(), 1);
396        assert_eq!(pipe2.read_buffer_len(), 3);
397
398        assert_eq!(&read_data1, b"o");
399        assert_eq!(&read_data2, b"hellowo");
400    }
401
402    #[test]
403    fn test_zero_capacity_buffer() {
404        let mut pipe = MockPipe::loopback(0);
405
406        // Two test passes: without and with timeout
407        for _ in 0..1 {
408            pipe.write_all(b"").unwrap();
409
410            // Attempt to write to a zero-capacity buffer should fail
411            assert_eq!(
412                pipe.write_all(b"hello").unwrap_err().kind(),
413                io::ErrorKind::WriteZero
414            );
415
416            pipe.read_exact(&mut []).unwrap();
417
418            // Attempt to read from a zero-capacity buffer should fail
419            let mut read_data = [0u8; 5];
420            assert_eq!(
421                pipe.read_exact(&mut read_data).unwrap_err().kind(),
422                io::ErrorKind::UnexpectedEof
423            );
424
425            // Set a timeout for the next pass
426            pipe.set_timeout(Some(Duration::from_millis(100)));
427        }
428    }
429
430    #[test]
431    fn test_timeout_write() {
432        // Small buffer
433        let mut pipe = MockPipe::loopback(5).with_timeout(Some(Duration::from_millis(100)));
434
435        // Try to read from empty buffer; should timeout
436        let mut read_data = [0u8; 5];
437        assert_eq!(
438            pipe.read_exact(&mut read_data).unwrap_err().kind(),
439            io::ErrorKind::TimedOut
440        );
441
442        // Fill the buffer
443        pipe.write_all(b"hello").unwrap();
444
445        // Attempt to write more data should cause timeout
446        assert_eq!(
447            pipe.write_all(b"!").unwrap_err().kind(),
448            io::ErrorKind::TimedOut
449        );
450    }
451
452    #[test]
453    fn test_buffer_clearing() {
454        let mut pipe = MockPipe::loopback(1024);
455
456        pipe.write_all(b"test").unwrap();
457
458        assert_eq!(pipe.write_buffer_len(), 4);
459        assert_eq!(pipe.read_buffer_len(), 4);
460
461        pipe.clear();
462
463        assert_eq!(pipe.write_buffer_len(), 0);
464        assert_eq!(pipe.read_buffer_len(), 0);
465
466        // The pipe is empty, so reading should timeout
467        let mut read_data = [0u8; 1];
468        assert_eq!(
469            pipe.read_exact(&mut read_data).unwrap_err().kind(),
470            io::ErrorKind::UnexpectedEof
471        );
472    }
473
474    #[test]
475    fn test_multiple_threads() {
476        use std::{thread, time};
477
478        let (mut pipe1, mut pipe2) = MockPipe::pair(1024);
479
480        let write_data1 = b"hello";
481        let write_data2 = b"hi";
482
483        let writer = thread::spawn(move || {
484            thread::sleep(time::Duration::from_millis(100));
485
486            pipe1.write_all(write_data1).unwrap();
487            assert_eq!(pipe1.write_buffer_len(), write_data1.len());
488
489            thread::sleep(time::Duration::from_millis(100));
490
491            pipe1.write_all(write_data2).unwrap();
492            assert_eq!(pipe1.write_buffer_len(), write_data2.len());
493
494            pipe1.flush().unwrap();
495            assert_eq!(pipe1.write_buffer_len(), 0);
496        });
497
498        let reader = thread::spawn(move || {
499            pipe2.set_timeout(Some(Duration::from_millis(1000)));
500
501            let mut read_data = [0u8; 5];
502            pipe2.read_exact(&mut read_data).unwrap();
503            assert_eq!(&read_data, write_data1);
504
505            thread::sleep(time::Duration::from_millis(200));
506
507            pipe2.set_timeout(Some(Duration::ZERO));
508
509            let mut read_data = [0u8; 2];
510            pipe2.read_exact(&mut read_data).unwrap();
511            assert_eq!(&read_data, write_data2);
512        });
513
514        writer.join().unwrap();
515        reader.join().unwrap();
516    }
517}