tokio_io_mock_fork/
lib.rs

1#![cfg_attr(docsrs_alt, feature(doc_cfg))]
2//! A mock type implementing [`AsyncRead`] and [`AsyncWrite`].
3//!
4//! (fork of [`tokio_test::io`](https://docs.rs/tokio-test/latest/tokio_test/io/index.html)
5//!
6//!
7//! # Overview
8//!
9//! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured
10//! to handle an arbitrary sequence of read and write operations. This is useful
11//! for writing unit tests for networking services as using an actual network
12//! type is fairly non deterministic.
13//!
14//! # Usage
15//!
16//! Attempting to write data that the mock isn't expecting will result in a
17//! panic.
18//!
19//! [`AsyncRead`]: tokio::io::AsyncRead
20//! [`AsyncWrite`]: tokio::io::AsyncWrite
21
22use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
23use tokio::sync::mpsc;
24use tokio::time::{self, Duration, Instant, Sleep};
25use tokio_stream::wrappers::UnboundedReceiverStream;
26
27use futures_core::Stream;
28use std::collections::VecDeque;
29use std::fmt;
30use std::future::Future;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::{self, ready, Poll, Waker};
34use std::{cmp, io};
35
36#[cfg(feature = "text-scenarios")]
37mod text_scenario;
38
39#[cfg(feature = "text-scenarios")]
40#[cfg_attr(docsrs_alt, doc(cfg(feature = "text-scenarios")))]
41pub use text_scenario::ParseError;
42
43#[cfg(feature = "panicless-mode")]
44#[cfg_attr(docsrs_alt, doc(cfg(feature = "panicless-mode")))]
45/// Details of an error detected by mock when in panicless mode.
46#[derive(Debug, PartialEq, Eq, Clone, Copy)]
47pub enum MockOutcomeError {
48    UnexpectedWrite,
49    WriteInsteadOfShutdown,
50    ShutdownInsteadOfWrite,
51    WrittenByteMismatch { expected: u8, actual: u8 },
52    RemainingUnreadData,
53    RemainingUnwrittenData,
54    Other,
55}
56
57#[cfg(feature = "panicless-mode")]
58impl std::fmt::Display for MockOutcomeError {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        match self {
61            MockOutcomeError::UnexpectedWrite => f.write_str("unexpected write"),
62            MockOutcomeError::WrittenByteMismatch { expected, actual } => {
63                write!(f, "mismatching byte, expected {expected}, got {actual}")
64            }
65            MockOutcomeError::RemainingUnreadData => f.write_str("data remains to be read"),
66            MockOutcomeError::RemainingUnwrittenData => f.write_str("data remains to be written"),
67            MockOutcomeError::Other => f.write_str("other error"),
68            MockOutcomeError::WriteInsteadOfShutdown => {
69                f.write_str("write where shutdown was expected")
70            }
71            MockOutcomeError::ShutdownInsteadOfWrite => {
72                f.write_str("shutdown where a write was expected")
73            }
74        }
75    }
76}
77
78#[cfg(feature = "panicless-mode")]
79#[cfg_attr(docsrs_alt, doc(cfg(feature = "panicless-mode")))]
80#[derive(Debug, PartialEq, Eq, Clone, Copy)]
81/// Final report of a a mock when used in panicless mode
82pub struct MockOutcome {
83    /// `Ok` if the mock was dropped without an encountered error, `Err` otherwise
84    pub outcome: Result<(), MockOutcomeError>,
85
86    pub total_read_bytes: u64,
87    pub total_written_bytes: u64,
88}
89#[cfg(feature = "panicless-mode")]
90impl std::fmt::Display for MockOutcome {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        match self.outcome {
93            Ok(()) => write!(
94                f,
95                "success after reading {} and writing {} bytes",
96                self.total_read_bytes, self.total_written_bytes
97            ),
98            Err(e) => write!(
99                f,
100                "error ({e}) after reading {} and writing {} bytes",
101                self.total_read_bytes, self.total_written_bytes
102            ),
103        }
104    }
105}
106
107/// An I/O object that follows a predefined script.
108///
109/// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It
110/// follows the scenario described by the builder and panics otherwise.
111#[derive(Debug)]
112pub struct Mock {
113    inner: Inner,
114}
115
116/// A handle to send additional actions to the related `Mock`.
117#[derive(Debug)]
118pub struct Handle {
119    tx: mpsc::UnboundedSender<Action>,
120}
121
122/// Builds `Mock` instances.
123#[derive(Debug, Clone, Default)]
124pub struct Builder {
125    // Sequence of actions for the Mock to take
126    actions: VecDeque<Action>,
127    name: String,
128    shutdown_checking_enabled: bool,
129}
130
131#[derive(Debug, Clone)]
132enum Action {
133    Read(Vec<u8>),
134    Write(Vec<u8>),
135    Wait(Duration),
136    // Wrapped in Arc so that Builder can be cloned and Send.
137    // Mock is not cloned as does not need to check Rc for ref counts.
138    ReadError(Option<Arc<io::Error>>),
139    WriteError(Option<Arc<io::Error>>),
140    WriteShutdown(bool),
141    ReadZeroes(usize),
142    WriteZeroes(usize),
143    IgnoreWritten(usize),
144    ReadEof(bool),
145    StopChecking,
146}
147
148struct Inner {
149    actions: VecDeque<Action>,
150    waiting: Option<Instant>,
151    sleep: Option<Pin<Box<Sleep>>>,
152    read_wait: Option<Waker>,
153    rx: UnboundedReceiverStream<Action>,
154    name: String,
155    checks_enabled: bool,
156    read_bytes: u64,
157    written_bytes: u64,
158    shutdown_checking_enabled: bool,
159    #[cfg(feature = "panicless-mode")]
160    panicless_tx: Option<tokio::sync::oneshot::Sender<MockOutcome>>,
161}
162
163impl Builder {
164    /// Return a new, empty `Builder`.
165    pub fn new() -> Self {
166        Self::default()
167    }
168
169    /// Sequence a `read` operation.
170    ///
171    /// The next operation in the mock's script will be to expect a `read` call
172    /// and return `buf`.
173    pub fn read(&mut self, buf: &[u8]) -> &mut Self {
174        self.actions.push_back(Action::Read(buf.into()));
175        self
176    }
177
178    /// Sequence a `read` operation, resuting in a specified number of zero bytes.
179    ///
180    /// Same as [`Self::read`] with aa zero buffer, but with less memory consumption.
181    ///
182    /// The next operation in the mock's script will be to expect a `read` call.
183    pub fn read_zeroes(&mut self, nbytes: usize) -> &mut Self {
184        self.actions.push_back(Action::ReadZeroes(nbytes));
185        self
186    }
187
188    /// Sequence a read operation that will return 0, i.e. end of file.
189    ///
190    /// The next operation in the mock's script will be to expect a `read` call.
191    pub fn eof(&mut self) -> &mut Self {
192        self.actions.push_back(Action::ReadEof(false));
193        self
194    }
195
196    /// Sequence a `read` operation that produces an error.
197    ///
198    /// The next operation in the mock's script will be to expect a `read` call
199    /// and return `error`.
200    pub fn read_error(&mut self, error: io::Error) -> &mut Self {
201        let error = Some(error.into());
202        self.actions.push_back(Action::ReadError(error));
203        self
204    }
205
206    /// Sequence a `write` operation.
207    ///
208    /// The next operation in the mock's script will be to expect a `write`
209    /// call.
210    pub fn write(&mut self, buf: &[u8]) -> &mut Self {
211        self.actions.push_back(Action::Write(buf.into()));
212        self
213    }
214
215    /// Sequence a `write` operation.
216    ///
217    /// The next operation in the mock's script will be to expect a `write`
218    /// call. The written bytes will be asserted to be equal to zero.
219    pub fn write_zeroes(&mut self, nbytes: usize) -> &mut Self {
220        self.actions.push_back(Action::WriteZeroes(nbytes));
221        self
222    }
223
224    /// Sequence a `write` operation.
225    ///
226    /// The next operation in the mock's script will be to expect a `write`
227    /// call. The written bytes will not be checked.
228    ///
229    /// Note that while content of the bytes are ignored,
230    /// specified number of bytes still must be written to the mock to avoid the failure.
231    pub fn write_ignore(&mut self, nbytes: usize) -> &mut Self {
232        self.actions.push_back(Action::IgnoreWritten(nbytes));
233        self
234    }
235
236    /// Sequence a `write` operation that produces an error.
237    ///
238    /// The next operation in the mock's script will be to expect a `write`
239    /// call that provides `error`.
240    pub fn write_error(&mut self, error: io::Error) -> &mut Self {
241        let error = Some(error.into());
242        self.actions.push_back(Action::WriteError(error));
243        self
244    }
245
246    /// Check that [`AsyncWrite::poll_shutdown`]s happen where needed.
247    ///
248    /// It is disabled by default for compatibility with `tokio_test::io::Mock`.
249    pub fn enable_shutdown_checking(&mut self) -> &mut Self {
250        self.shutdown_checking_enabled = true;
251        self
252    }
253
254    /// Sequence a `shutdown` operation that corresponds to an expected [`AsyncWrite::poll_shutdown`] call.
255    ///
256    /// Automatically does [`Self::enable_shutdown_checking`].
257    pub fn write_shutdown(&mut self) -> &mut Self {
258        self.shutdown_checking_enabled = true;
259        self.actions.push_back(Action::WriteShutdown(false));
260        self
261    }
262
263    /// Sequence a special event that makes this Mock stop asserting any operation (i.e. allow everything).
264    ///
265    /// Reaching this means the test is already succeeded and possible
266    /// further operations are likely irrelevent.
267    ///
268    /// More reads can still be sequenced after this.
269    pub fn stop_checking(&mut self) -> &mut Self {
270        self.actions.push_back(Action::StopChecking);
271        self
272    }
273
274    /// Sequence a wait.
275    ///
276    /// The next operation in the mock's script will be to wait without doing so
277    /// for `duration` amount of time.
278    pub fn wait(&mut self, duration: Duration) -> &mut Self {
279        let duration = cmp::max(duration, Duration::from_millis(1));
280        self.actions.push_back(Action::Wait(duration));
281        self
282    }
283
284    /// Set name of the mock IO object to include in panic messages and debug output
285    pub fn name(&mut self, name: impl Into<String>) -> &mut Self {
286        self.name = name.into();
287        self
288    }
289
290    /// Sequence multiple operations using a special text scenario.
291    ///
292    /// Text scenario commands consist of a command character, subcommand
293    /// character (if needed), content (if needed) and `|` terminator/separator.
294    ///
295    /// Parsing is not very strict.
296    ///
297    /// Commands:
298    ///
299    /// * `R` - sequence specified bytes to be read
300    /// * `W` - sequence specified bytes to be written
301    /// * `ZR` - sequence specified number of zero bytes to be read
302    /// * `ZW` - sequence specified number of zero bytes to be written
303    /// * `I` - sequence specified number of written bytes to be ignored
304    /// * `X` - sequence a EOF event
305    /// * `Q` - sequence stop_checking event
306    /// * `ER` - sequence a read error
307    /// * `EW` - sequence a write error
308    /// * `D` - sequence a write shutdown
309    /// * `T` - sequence a sleep for a specified number of milliseconds
310    /// * `N` - set name of this mock stream object
311    ///
312    /// Most characters after `R` or `W` or `N` become content of the buffer.
313    /// Exceptions: `|` character that marks end of command, leading (but not trailing) whitespace
314    /// and escape sequences starting with `\`, like `\n` or `\xFF`.
315    ///
316    /// Numbers for `ZR`, `ZW`, `I` or `T` can be prefixed with `x` or `0x` to use hex instead of dec.
317    ///
318    /// Examples:
319    ///
320    /// * `R hello|W world` is equivalent to `.read(b"hello").write(b"world")`
321    /// * `R \x55\x00| ZR x5500| R \x00\x03| ZR 3` is equivalent to `.read(b"\x55\x00").read_zeroes(0x5500).read(b"\x00\x03").read_zeroes(3)`
322    /// * `R cat; echo qqq\n| R 1234|W 1234| R 56|W 56| X | W qqq|`
323    /// * `N calc|R 2+2|W 4|R 3+$RANDOM|Q`
324    /// * `R PING|W PONG|T5000|R PING|W PONG|T5000|ER`
325    ///
326    /// Mnemonics/explanations:
327    ///
328    /// * `T` - timeout, timer
329    /// * `ZR` instead of `RZ` because of `Z` would be interpreted as a content of `R`
330    /// * `Q` - quiet mode, quit checking, quash all assertions
331    /// * `X` - eXit reading, eXtend writer span when reading is already finished. `E` is already busy for injected errors.
332    /// * `D` - shutDown, `^D`` to send EOF in console
333    #[cfg(feature = "text-scenarios")]
334    #[cfg_attr(docsrs_alt, doc(cfg(feature = "text-scenarios")))]
335    pub fn text_scenario(&mut self, scenario: &str) -> Result<&mut Self, ParseError> {
336        let (items, name) = text_scenario::parse_text_scenario(scenario)?;
337        if let Some(name) = name {
338            self.name = name;
339        }
340        self.actions.extend(items);
341        Ok(self)
342    }
343
344    /// Build a `Mock` value according to the defined script.
345    pub fn build(&mut self) -> Mock {
346        let (mock, _) = self.build_with_handle();
347        mock
348    }
349
350    /// Build a `Mock` value paired with a handle
351    pub fn build_with_handle(&mut self) -> (Mock, Handle) {
352        let (inner, handle) = Inner::new(
353            self.actions.clone(),
354            self.name.clone(),
355            self.shutdown_checking_enabled,
356        );
357
358        let mock = Mock { inner };
359
360        (mock, handle)
361    }
362
363    /// Build a `Mock` value that should not normally panic.
364    ///
365    /// Instead of panicking when a mismatch is detected, it should signal the outcome to the channel.
366    #[cfg(feature = "panicless-mode")]
367    #[cfg_attr(docsrs_alt, doc(cfg(feature = "panicless-mode")))]
368    pub fn build_panicless(
369        &mut self,
370    ) -> (Mock, Handle, tokio::sync::oneshot::Receiver<MockOutcome>) {
371        let (mut inner, handle) = Inner::new(
372            self.actions.clone(),
373            self.name.clone(),
374            self.shutdown_checking_enabled,
375        );
376
377        let (tx, rx) = tokio::sync::oneshot::channel();
378
379        inner.panicless_tx = Some(tx);
380
381        let mock = Mock { inner };
382
383        (mock, handle, rx)
384    }
385}
386
387impl Handle {
388    /// Sequence a `read` operation.
389    ///
390    /// The next operation in the mock's script will be to expect a `read` call
391    /// and return `buf`.
392    pub fn read(&mut self, buf: &[u8]) -> &mut Self {
393        self.tx.send(Action::Read(buf.into())).unwrap();
394        self
395    }
396
397    /// Sequence a `read` operation, resuting in a specified number of zero bytes.
398    ///
399    /// Same as [`Self::read`] with aa zero buffer, but with less memory consumption.
400    ///
401    /// The next operation in the mock's script will be to expect a `read` call.
402    pub fn read_zeroes(&mut self, nbytes: usize) -> &mut Self {
403        self.tx.send(Action::ReadZeroes(nbytes)).unwrap();
404        self
405    }
406
407    /// Sequence a read operation that will return 0, i.e. end of file.
408    ///
409    /// The next operation in the mock's script will be to expect a `read` call.
410    pub fn eof(&mut self) -> &mut Self {
411        self.tx.send(Action::ReadEof(false)).unwrap();
412        self
413    }
414
415    /// Sequence a `read` operation error.
416    ///
417    /// The next operation in the mock's script will be to expect a `read` call
418    /// and return `error`.
419    pub fn read_error(&mut self, error: io::Error) -> &mut Self {
420        let error = Some(error.into());
421        self.tx.send(Action::ReadError(error)).unwrap();
422        self
423    }
424
425    /// Sequence a `write` operation.
426    ///
427    /// The next operation in the mock's script will be to expect a `write`
428    /// call.
429    pub fn write(&mut self, buf: &[u8]) -> &mut Self {
430        self.tx.send(Action::Write(buf.into())).unwrap();
431        self
432    }
433
434    /// Sequence a `write` operation.
435    ///
436    /// The next operation in the mock's script will be to expect a `write`
437    /// call. The written bytes will be asserted to be equal to zero.
438    pub fn write_zeroes(&mut self, nbytes: usize) -> &mut Self {
439        self.tx.send(Action::WriteZeroes(nbytes)).unwrap();
440        self
441    }
442
443    /// Sequence a `write` operation.
444    ///
445    /// The next operation in the mock's script will be to expect a `write`
446    /// call. The written bytes will not be checked.
447    ///
448    /// Note that while content of the bytes are ignored,
449    /// specified number of bytes still must be written to the mock to avoid the failure.
450    pub fn write_ignore(&mut self, nbytes: usize) -> &mut Self {
451        self.tx.send(Action::IgnoreWritten(nbytes)).unwrap();
452        self
453    }
454
455    /// Sequence a `write` operation error.
456    ///
457    /// The next operation in the mock's script will be to expect a `write`
458    /// call error.
459    pub fn write_error(&mut self, error: io::Error) -> &mut Self {
460        let error = Some(error.into());
461        self.tx.send(Action::WriteError(error)).unwrap();
462        self
463    }
464
465    /// Sequence a `shutdown` operation that corresponds to an expected [`AsyncWrite::poll_shutdown`] call.
466    ///
467    /// Note that unlike with [`Builder::write_shutdown`] you need to do [`Builder::enable_shutdown_checking`].explicitly.
468    pub fn write_shutdown(&mut self) -> &mut Self {
469        self.tx.send(Action::WriteShutdown(false)).unwrap();
470        self
471    }
472
473    /// Sequence a special event that makes this Mock stop asserting any operation (i.e. allow everything).
474    ///
475    /// Reaching this means the test is already succeeded and possible
476    /// further operations are likely irrelevent.
477    ///
478    /// More reads can still be sequenced after this.
479    pub fn stop_checking(&mut self) -> &mut Self {
480        self.tx.send(Action::StopChecking).unwrap();
481        self
482    }
483
484    /// Sequence multiple operations using a special text scenario.
485    ///
486    /// See [`Builder::text_scenario`] for the description of the text content.
487    ///
488    /// Note that `N` (set name) command is ignored.
489    #[cfg(feature = "text-scenarios")]
490    #[cfg_attr(docsrs_alt, doc(cfg(feature = "text-scenarios")))]
491    pub fn text_scenario(&mut self, scenario: &str) -> Result<&mut Self, ParseError> {
492        let (items, name) = text_scenario::parse_text_scenario(scenario)?;
493        if let Some(_name) = name {
494            // ignoring the name in this context
495        }
496        for x in items {
497            self.tx.send(x).unwrap();
498        }
499        Ok(self)
500    }
501}
502
503impl Inner {
504    fn new(
505        actions: VecDeque<Action>,
506        name: String,
507        shutdown_checking_enabled: bool,
508    ) -> (Inner, Handle) {
509        let (tx, rx) = mpsc::unbounded_channel();
510
511        let rx = UnboundedReceiverStream::new(rx);
512
513        let inner = Inner {
514            actions,
515            sleep: None,
516            read_wait: None,
517            rx,
518            waiting: None,
519            name,
520            checks_enabled: true,
521            read_bytes: 0,
522            written_bytes: 0,
523            shutdown_checking_enabled,
524            #[cfg(feature = "panicless-mode")]
525            panicless_tx: None,
526        };
527
528        let handle = Handle { tx };
529
530        (inner, handle)
531    }
532
533    fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
534        Pin::new(&mut self.rx).poll_next(cx)
535    }
536
537    fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> {
538        match self.action() {
539            Some(&mut Action::ReadZeroes(ref mut nbytes)) => {
540                let n = cmp::min(dst.remaining(), *nbytes);
541                let nfilled = dst.filled().len();
542                dst.initialize_unfilled_to(nfilled + n)[nfilled..].fill(0);
543                dst.set_filled(nfilled + n);
544                *nbytes -= n;
545                self.read_bytes += n as u64;
546                Ok(())
547            }
548            Some(&mut Action::ReadEof(ref mut observed)) => {
549                *observed = true;
550                Ok(())
551            }
552            Some(&mut Action::Read(ref mut data)) => {
553                // Figure out how much to copy
554                let n = cmp::min(dst.remaining(), data.len());
555
556                // Copy the data into the `dst` slice
557                dst.put_slice(&data[..n]);
558
559                // Drain the data from the source
560                data.drain(..n);
561
562                self.read_bytes += n as u64;
563
564                Ok(())
565            }
566            Some(&mut Action::ReadError(ref mut err)) => {
567                // As the
568                let err = err.take().expect("Should have been removed from actions.");
569                let err = Arc::try_unwrap(err).expect("There are no other references.");
570                Err(err)
571            }
572            Some(_) => {
573                // Either waiting or expecting a write
574                Err(io::ErrorKind::WouldBlock.into())
575            }
576            None => Ok(()),
577        }
578    }
579
580    fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
581        let mut ret = 0;
582
583        if self.actions.is_empty() {
584            return Err(io::ErrorKind::BrokenPipe.into());
585        }
586
587        if let Some(&mut Action::Wait(..)) = self.action() {
588            return Err(io::ErrorKind::WouldBlock.into());
589        }
590
591        if let Some(&mut Action::WriteError(ref mut err)) = self.action() {
592            let err = err.take().expect("Should have been removed from actions.");
593            let err = Arc::try_unwrap(err).expect("There are no other references.");
594            return Err(err);
595        }
596
597        let mut checks_enabled = self.checks_enabled;
598
599        let n_remaining_actions = self.actions.len();
600        for i in 0..n_remaining_actions {
601            let action = &mut self.actions[i];
602            let ignore_written = matches!(action, Action::IgnoreWritten { .. });
603            match action {
604                Action::Write(ref mut expect) => {
605                    let n = cmp::min(src.len(), expect.len());
606
607                    #[allow(unused_mut)]
608                    let mut already_checked = false;
609
610                    #[cfg(feature = "panicless-mode")]
611                    if checks_enabled && self.panicless_tx.is_some() {
612                        for i in 0..n {
613                            let expected = expect[i];
614                            let actual = src[i];
615                            if expected != actual {
616                                let _ = self.panicless_tx.take().unwrap().send(MockOutcome {
617                                    outcome: Err(MockOutcomeError::WrittenByteMismatch {
618                                        expected,
619                                        actual,
620                                    }),
621                                    total_read_bytes: self.read_bytes,
622                                    total_written_bytes: self.written_bytes,
623                                });
624                                self.checks_enabled = false;
625                            }
626                            self.written_bytes += 1;
627                        }
628                        already_checked = true;
629                    }
630
631                    if self.checks_enabled && !already_checked {
632                        assert_eq!(
633                            &src[..n],
634                            &expect[..n],
635                            "name={} r={} w={} remaining actions: {}",
636                            self.name,
637                            self.read_bytes,
638                            self.written_bytes,
639                            n_remaining_actions - i
640                        );
641                        self.written_bytes += n as u64;
642                    }
643                    if !already_checked {
644                        self.written_bytes += n as u64;
645                    }
646
647                    // Drop data that was matched
648                    expect.drain(..n);
649                    src = &src[n..];
650
651                    ret += n;
652
653                    if src.is_empty() {
654                        return Ok(ret);
655                    }
656                }
657                Action::WriteZeroes(ref mut nbytes) | Action::IgnoreWritten(ref mut nbytes) => {
658                    let n = cmp::min(src.len(), *nbytes);
659
660                    #[allow(unused_mut)]
661                    let mut already_checked = false;
662
663                    #[cfg(feature = "panicless-mode")]
664                    if checks_enabled && !ignore_written && self.panicless_tx.is_some() {
665                        for i in 0..n {
666                            let expected = 0;
667                            let actual = src[i];
668                            if expected != actual {
669                                // lifetimes block refactoring here
670                                let _ = self.panicless_tx.take().unwrap().send(MockOutcome {
671                                    outcome: Err(MockOutcomeError::WrittenByteMismatch {
672                                        expected,
673                                        actual,
674                                    }),
675                                    total_read_bytes: self.read_bytes,
676                                    total_written_bytes: self.written_bytes,
677                                });
678                                self.checks_enabled = false;
679                            }
680                            self.written_bytes += 1;
681                        }
682                        already_checked = true;
683                    }
684
685                    if checks_enabled && !ignore_written && !already_checked {
686                        for (j, x) in src[..n].iter().enumerate() {
687                            self.written_bytes += 1;
688                            assert_eq!(
689                                *x,
690                                0,
691                                "byte_index={j} r={} w={} name={} remaining actions: {}",
692                                self.read_bytes,
693                                self.written_bytes,
694                                self.name,
695                                n_remaining_actions - i
696                            );
697                        }
698                    } else {
699                        if !already_checked {
700                            self.written_bytes += n as u64;
701                        }
702                    }
703
704                    // Drop data that was matched
705                    *nbytes -= n;
706                    src = &src[n..];
707
708                    ret += n;
709
710                    if src.is_empty() {
711                        return Ok(ret);
712                    }
713                }
714                Action::WriteShutdown(ref mut observed) => {
715                    #[cfg(feature = "panicless-mode")]
716                    if checks_enabled && self.panicless_tx.is_some() {
717                        let _ = self.panicless_tx.take().unwrap().send(MockOutcome {
718                            outcome: Err(MockOutcomeError::WriteInsteadOfShutdown),
719                            total_read_bytes: self.read_bytes,
720                            total_written_bytes: self.written_bytes,
721                        });
722                        self.checks_enabled = false;
723                        *observed = true;
724                        return Err(io::ErrorKind::InvalidData.into());
725                    }
726
727                    if checks_enabled {
728                        panic!(
729                                "unexpected write (a shutdown was expected) r={} w={} name={} remaining actions: {}",
730                                self.read_bytes,
731                                self.written_bytes,
732                                self.name,
733                                n_remaining_actions - i
734                            );
735                    } else {
736                        *observed = true;
737                        return Err(io::ErrorKind::InvalidData.into());
738                    }
739                }
740                Action::StopChecking => checks_enabled = false,
741                Action::Wait(..) | Action::WriteError(..) => {
742                    break;
743                }
744                _ => {}
745            }
746
747            // TODO: remove write
748        }
749
750        Ok(ret)
751    }
752
753    fn shutdown(&mut self) -> io::Result<()> {
754        if self.actions.is_empty() {
755            return Ok(());
756        }
757
758        if let Some(&mut Action::Wait(..)) = self.action() {
759            return Err(io::ErrorKind::WouldBlock.into());
760        }
761
762        let mut checks_enabled = self.checks_enabled;
763
764        let n_remaining_actions = self.actions.len();
765        for i in 0..n_remaining_actions {
766            let action = &mut self.actions[i];
767            let mut pending_writes = false;
768            match action {
769                Action::Write(ref buf) if !buf.is_empty() => pending_writes = true,
770                Action::IgnoreWritten(nbytes) if *nbytes > 0 => pending_writes = true,
771                Action::WriteError(ref x) if x.is_some() => pending_writes = true,
772                Action::WriteZeroes(nbytes) if *nbytes > 0 => pending_writes = true,
773                Action::WriteShutdown(ref mut observed) => {
774                    *observed = true;
775                    return Ok(());
776                }
777                Action::StopChecking => checks_enabled = false,
778                Action::Wait(..) => {
779                    break;
780                }
781                _ => {}
782            }
783
784            if pending_writes {
785                #[cfg(feature = "panicless-mode")]
786                if self.checks_enabled && self.panicless_tx.is_some() {
787                    let _ = self.panicless_tx.take().unwrap().send(MockOutcome {
788                        outcome: Err(MockOutcomeError::ShutdownInsteadOfWrite),
789                        total_read_bytes: self.read_bytes,
790                        total_written_bytes: self.written_bytes,
791                    });
792                    self.checks_enabled = false;
793                    return Err(io::ErrorKind::InvalidData.into());
794                }
795
796                if checks_enabled {
797                    panic!(
798                       "Unexpected shutdown (there are more pending write actions) name={} r={} w={} remaining actions: {}",
799                       self.name,
800                       self.read_bytes,
801                       self.written_bytes,
802                       n_remaining_actions - i
803                   );
804                }
805            }
806        }
807
808        Ok(())
809    }
810
811    fn remaining_wait(&mut self) -> Option<Duration> {
812        match self.action() {
813            Some(&mut Action::Wait(dur)) => Some(dur),
814            _ => None,
815        }
816    }
817
818    fn action(&mut self) -> Option<&mut Action> {
819        loop {
820            if self.actions.is_empty() {
821                return None;
822            }
823
824            match self.actions[0] {
825                Action::Read(ref mut data) => {
826                    if !data.is_empty() {
827                        break;
828                    }
829                }
830                Action::ReadZeroes(n) => {
831                    if n > 0 {
832                        break;
833                    }
834                }
835                Action::ReadEof(ref observed) => {
836                    if !observed {
837                        break;
838                    }
839                }
840                Action::Write(ref mut data) => {
841                    if !data.is_empty() {
842                        break;
843                    }
844                }
845                Action::WriteZeroes(n) => {
846                    if n > 0 {
847                        break;
848                    }
849                }
850                Action::IgnoreWritten(n) => {
851                    if n > 0 {
852                        break;
853                    }
854                }
855                Action::Wait(ref mut dur) => {
856                    if let Some(until) = self.waiting {
857                        let now = Instant::now();
858
859                        if now < until {
860                            break;
861                        } else {
862                            self.waiting = None;
863                        }
864                    } else {
865                        self.waiting = Some(Instant::now() + *dur);
866                        break;
867                    }
868                }
869                Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => {
870                    if error.is_some() {
871                        break;
872                    }
873                }
874                Action::WriteShutdown(ref observed) => {
875                    if !*observed {
876                        break;
877                    }
878                }
879                Action::StopChecking => {
880                    self.checks_enabled = false;
881                    break;
882                }
883            }
884
885            let _action = self.actions.pop_front();
886        }
887
888        self.actions.front_mut()
889    }
890}
891
892// ===== impl Inner =====
893
894impl Mock {
895    fn maybe_wakeup_reader(&mut self) {
896        match self.inner.action() {
897            Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => {
898                if let Some(waker) = self.inner.read_wait.take() {
899                    waker.wake();
900                }
901            }
902            _ => {}
903        }
904    }
905}
906
907impl AsyncRead for Mock {
908    fn poll_read(
909        mut self: Pin<&mut Self>,
910        cx: &mut task::Context<'_>,
911        buf: &mut ReadBuf<'_>,
912    ) -> Poll<io::Result<()>> {
913        loop {
914            if let Some(ref mut sleep) = self.inner.sleep {
915                ready!(Pin::new(sleep).poll(cx));
916            }
917
918            // If a sleep is set, it has already fired
919            self.inner.sleep = None;
920
921            // Capture 'filled' to monitor if it changed
922            let filled = buf.filled().len();
923
924            match self.inner.read(buf) {
925                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
926                    if let Some(rem) = self.inner.remaining_wait() {
927                        let until = Instant::now() + rem;
928                        self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
929                    } else {
930                        self.inner.read_wait = Some(cx.waker().clone());
931                        return Poll::Pending;
932                    }
933                }
934                Ok(()) => {
935                    if buf.filled().len() == filled {
936                        match ready!(self.inner.poll_action(cx)) {
937                            Some(action) => {
938                                self.inner.actions.push_back(action);
939                                continue;
940                            }
941                            None => {
942                                return Poll::Ready(Ok(()));
943                            }
944                        }
945                    } else {
946                        return Poll::Ready(Ok(()));
947                    }
948                }
949                Err(e) => return Poll::Ready(Err(e)),
950            }
951        }
952    }
953}
954
955impl AsyncWrite for Mock {
956    fn poll_write(
957        mut self: Pin<&mut Self>,
958        cx: &mut task::Context<'_>,
959        buf: &[u8],
960    ) -> Poll<io::Result<usize>> {
961        loop {
962            if let Some(ref mut sleep) = self.inner.sleep {
963                ready!(Pin::new(sleep).poll(cx));
964            }
965
966            // If a sleep is set, it has already fired
967            self.inner.sleep = None;
968
969            if self.inner.actions.is_empty() {
970                match self.inner.poll_action(cx) {
971                    Poll::Pending => {
972                        // do not propagate pending
973                    }
974                    Poll::Ready(Some(action)) => {
975                        self.inner.actions.push_back(action);
976                    }
977                    Poll::Ready(None) => {
978                        #[cfg(feature = "panicless-mode")]
979                        if self.inner.checks_enabled && self.inner.panicless_tx.is_some() {
980                            let _ = self.inner.panicless_tx.take().unwrap().send(MockOutcome {
981                                outcome: Err(MockOutcomeError::UnexpectedWrite),
982                                total_read_bytes: self.inner.read_bytes,
983                                total_written_bytes: self.inner.written_bytes,
984                            });
985                            self.inner.checks_enabled = false;
986                        }
987
988                        if self.inner.checks_enabled {
989                            panic!("unexpected write {}", self.pmsg());
990                        } else {
991                            return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
992                        }
993                    }
994                }
995            }
996
997            match self.inner.write(buf) {
998                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
999                    if let Some(rem) = self.inner.remaining_wait() {
1000                        let until = Instant::now() + rem;
1001                        self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
1002                    } else {
1003                        #[cfg(feature = "panicless-mode")]
1004                        if self.inner.checks_enabled && self.inner.panicless_tx.is_some() {
1005                            let _ = self.inner.panicless_tx.take().unwrap().send(MockOutcome {
1006                                outcome: Err(MockOutcomeError::Other),
1007                                total_read_bytes: self.inner.read_bytes,
1008                                total_written_bytes: self.inner.written_bytes,
1009                            });
1010                            self.inner.checks_enabled = false;
1011                        }
1012
1013                        if self.inner.checks_enabled {
1014                            panic!("unexpected WouldBlock {}", self.pmsg());
1015                        } else {
1016                            return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
1017                        }
1018                    }
1019                }
1020                Ok(0) => {
1021                    // TODO: Is this correct?
1022                    if self.inner.action().is_some() {
1023                        return Poll::Pending;
1024                    }
1025
1026                    // TODO: Extract
1027                    match ready!(self.inner.poll_action(cx)) {
1028                        Some(action) => {
1029                            self.inner.actions.push_back(action);
1030                            continue;
1031                        }
1032                        None => {
1033                            #[cfg(feature = "panicless-mode")]
1034                            if self.inner.checks_enabled && self.inner.panicless_tx.is_some() {
1035                                let _ = self.inner.panicless_tx.take().unwrap().send(MockOutcome {
1036                                    outcome: Err(MockOutcomeError::UnexpectedWrite),
1037                                    total_read_bytes: self.inner.read_bytes,
1038                                    total_written_bytes: self.inner.written_bytes,
1039                                });
1040                                self.inner.checks_enabled = false;
1041                            }
1042
1043                            if self.inner.checks_enabled {
1044                                panic!("unexpected write {}", self.pmsg());
1045                            } else {
1046                                return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
1047                            }
1048                        }
1049                    }
1050                }
1051                ret => {
1052                    self.maybe_wakeup_reader();
1053                    return Poll::Ready(ret);
1054                }
1055            }
1056        }
1057    }
1058
1059    fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
1060        Poll::Ready(Ok(()))
1061    }
1062
1063    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
1064        if !self.inner.shutdown_checking_enabled {
1065            return Poll::Ready(Ok(()));
1066        }
1067        loop {
1068            if let Some(ref mut sleep) = self.inner.sleep {
1069                ready!(Pin::new(sleep).poll(cx));
1070            }
1071
1072            // If a sleep is set, it has already fired
1073            self.inner.sleep = None;
1074
1075            if self.inner.actions.is_empty() {
1076                match self.inner.poll_action(cx) {
1077                    Poll::Pending => {
1078                        // do not propagate pending
1079                    }
1080                    Poll::Ready(Some(action)) => {
1081                        self.inner.actions.push_back(action);
1082                    }
1083                    Poll::Ready(None) => {
1084                        // OK to omit shutdown at the end
1085                        return Poll::Ready(Ok(()));
1086                    }
1087                }
1088            }
1089
1090            match self.inner.shutdown() {
1091                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1092                    if let Some(rem) = self.inner.remaining_wait() {
1093                        let until = Instant::now() + rem;
1094                        self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
1095                    } else {
1096                        #[cfg(feature = "panicless-mode")]
1097                        if self.inner.checks_enabled && self.inner.panicless_tx.is_some() {
1098                            let _ = self.inner.panicless_tx.take().unwrap().send(MockOutcome {
1099                                outcome: Err(MockOutcomeError::Other),
1100                                total_read_bytes: self.inner.read_bytes,
1101                                total_written_bytes: self.inner.written_bytes,
1102                            });
1103                            self.inner.checks_enabled = false;
1104                        }
1105
1106                        if self.inner.checks_enabled {
1107                            panic!("unexpected WouldBlock {}", self.pmsg());
1108                        } else {
1109                            return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
1110                        }
1111                    }
1112                }
1113                ret => {
1114                    self.maybe_wakeup_reader();
1115                    return Poll::Ready(ret);
1116                }
1117            }
1118        }
1119    }
1120}
1121
1122/// Ensures that Mock isn't dropped with data "inside".
1123impl Drop for Mock {
1124    fn drop(&mut self) {
1125        #[cfg(feature = "panicless-mode")]
1126        if let Some(tx) = self.inner.panicless_tx.take() {
1127            let mut outcome = Ok(());
1128
1129            if self.inner.checks_enabled {
1130                self.inner.actions.iter().for_each(|a| match a {
1131                    Action::Read(data) if !data.is_empty() => {
1132                        outcome = Err(MockOutcomeError::RemainingUnreadData)
1133                    }
1134                    Action::ReadZeroes(nbytes) if *nbytes > 0 => {
1135                        outcome = Err(MockOutcomeError::RemainingUnreadData)
1136                    }
1137                    Action::ReadEof(observed) if !observed => {
1138                        outcome = Err(MockOutcomeError::RemainingUnreadData)
1139                    }
1140                    Action::Write(data) if !data.is_empty() => {
1141                        outcome = Err(MockOutcomeError::RemainingUnwrittenData)
1142                    }
1143                    Action::WriteZeroes(nbytes) if *nbytes > 0 => {
1144                        outcome = Err(MockOutcomeError::RemainingUnwrittenData)
1145                    }
1146                    Action::IgnoreWritten(nbytes) if *nbytes > 0 => {
1147                        outcome = Err(MockOutcomeError::RemainingUnwrittenData)
1148                    }
1149                    _ => (),
1150                });
1151            }
1152
1153            let _ = tx.send(MockOutcome {
1154                outcome,
1155                total_read_bytes: self.inner.read_bytes,
1156                total_written_bytes: self.inner.written_bytes,
1157            });
1158            return;
1159        }
1160
1161        // Avoid double panicking, since makes debugging much harder.
1162        if std::thread::panicking() {
1163            return;
1164        }
1165
1166        if !self.inner.checks_enabled {
1167            return;
1168        }
1169
1170        self.inner.actions.iter().for_each(|a| match a {
1171            Action::Read(data) => assert!(
1172                data.is_empty(),
1173                "There is still data left to read. {}",
1174                self.pmsg()
1175            ),
1176            Action::ReadZeroes(nbytes) => assert!(
1177                *nbytes == 0,
1178                "There is still data left to read. {}",
1179                self.pmsg()
1180            ),
1181            Action::ReadEof(observed) => assert!(
1182                observed,
1183                "There is still a read EOF event that was not observed {}",
1184                self.pmsg()
1185            ),
1186            Action::Write(data) => assert!(
1187                data.is_empty(),
1188                "There is still data left to write. {}",
1189                self.pmsg()
1190            ),
1191            Action::WriteZeroes(nbytes) => assert!(
1192                *nbytes == 0,
1193                "There is still data left to write. {}",
1194                self.pmsg()
1195            ),
1196            Action::IgnoreWritten(nbytes) => assert!(
1197                *nbytes == 0,
1198                "There is still data left to write (even though content is to be ignored). {}",
1199                self.pmsg()
1200            ),
1201            _ => (),
1202        });
1203    }
1204}
1205/*
1206/// Returns `true` if called from the context of a futures-rs Task
1207fn is_task_ctx() -> bool {
1208    use std::panic;
1209
1210    // Save the existing panic hook
1211    let h = panic::take_hook();
1212
1213    // Install a new one that does nothing
1214    panic::set_hook(Box::new(|_| {}));
1215
1216    // Attempt to call the fn
1217    let r = panic::catch_unwind(|| task::current()).is_ok();
1218
1219    // Re-install the old one
1220    panic::set_hook(h);
1221
1222    // Return the result
1223    r
1224}
1225*/
1226
1227impl fmt::Debug for Inner {
1228    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1229        if self.name.is_empty() {
1230            write!(f, "Inner {{...}}")
1231        } else {
1232            write!(f, "Inner {{name={}, ...}}", self.name)
1233        }
1234    }
1235}
1236
1237struct PanicMsgSnippet<'a>(&'a Inner);
1238
1239impl<'a> fmt::Display for PanicMsgSnippet<'a> {
1240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1241        if self.0.name.is_empty() {
1242            write!(
1243                f,
1244                "({} actions remain, {} bytes was read, {} bytes was written)",
1245                self.0.actions.len(),
1246                self.0.read_bytes,
1247                self.0.written_bytes,
1248            )
1249        } else {
1250            write!(
1251                f,
1252                "(name {}, {} actions remain, {} bytes was read, {} bytes was written)",
1253                self.0.name,
1254                self.0.actions.len(),
1255                self.0.read_bytes,
1256                self.0.written_bytes,
1257            )
1258        }
1259    }
1260}
1261
1262impl Mock {
1263    fn pmsg(&self) -> PanicMsgSnippet<'_> {
1264        PanicMsgSnippet(&self.inner)
1265    }
1266}