1#![cfg_attr(docsrs_alt, feature(doc_cfg))]
2use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
23use tokio::sync::mpsc;
24use tokio::time::{self, Duration, Instant, Sleep};
25use tokio_stream::wrappers::UnboundedReceiverStream;
26
27use futures_core::Stream;
28use std::collections::VecDeque;
29use std::fmt;
30use std::future::Future;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::{self, ready, Poll, Waker};
34use std::{cmp, io};
35
36#[cfg(feature = "text-scenarios")]
37mod text_scenario;
38
39#[cfg(feature = "text-scenarios")]
40#[cfg_attr(docsrs_alt, doc(cfg(feature = "text-scenarios")))]
41pub use text_scenario::ParseError;
42
43#[cfg(feature = "panicless-mode")]
44#[cfg_attr(docsrs_alt, doc(cfg(feature = "panicless-mode")))]
45#[derive(Debug, PartialEq, Eq, Clone, Copy)]
47pub enum MockOutcomeError {
48 UnexpectedWrite,
49 WriteInsteadOfShutdown,
50 ShutdownInsteadOfWrite,
51 WrittenByteMismatch { expected: u8, actual: u8 },
52 RemainingUnreadData,
53 RemainingUnwrittenData,
54 Other,
55}
56
57#[cfg(feature = "panicless-mode")]
58impl std::fmt::Display for MockOutcomeError {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 match self {
61 MockOutcomeError::UnexpectedWrite => f.write_str("unexpected write"),
62 MockOutcomeError::WrittenByteMismatch { expected, actual } => {
63 write!(f, "mismatching byte, expected {expected}, got {actual}")
64 }
65 MockOutcomeError::RemainingUnreadData => f.write_str("data remains to be read"),
66 MockOutcomeError::RemainingUnwrittenData => f.write_str("data remains to be written"),
67 MockOutcomeError::Other => f.write_str("other error"),
68 MockOutcomeError::WriteInsteadOfShutdown => {
69 f.write_str("write where shutdown was expected")
70 }
71 MockOutcomeError::ShutdownInsteadOfWrite => {
72 f.write_str("shutdown where a write was expected")
73 }
74 }
75 }
76}
77
78#[cfg(feature = "panicless-mode")]
79#[cfg_attr(docsrs_alt, doc(cfg(feature = "panicless-mode")))]
80#[derive(Debug, PartialEq, Eq, Clone, Copy)]
81pub struct MockOutcome {
83 pub outcome: Result<(), MockOutcomeError>,
85
86 pub total_read_bytes: u64,
87 pub total_written_bytes: u64,
88}
89#[cfg(feature = "panicless-mode")]
90impl std::fmt::Display for MockOutcome {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 match self.outcome {
93 Ok(()) => write!(
94 f,
95 "success after reading {} and writing {} bytes",
96 self.total_read_bytes, self.total_written_bytes
97 ),
98 Err(e) => write!(
99 f,
100 "error ({e}) after reading {} and writing {} bytes",
101 self.total_read_bytes, self.total_written_bytes
102 ),
103 }
104 }
105}
106
107#[derive(Debug)]
112pub struct Mock {
113 inner: Inner,
114}
115
116#[derive(Debug)]
118pub struct Handle {
119 tx: mpsc::UnboundedSender<Action>,
120}
121
122#[derive(Debug, Clone, Default)]
124pub struct Builder {
125 actions: VecDeque<Action>,
127 name: String,
128 shutdown_checking_enabled: bool,
129}
130
131#[derive(Debug, Clone)]
132enum Action {
133 Read(Vec<u8>),
134 Write(Vec<u8>),
135 Wait(Duration),
136 ReadError(Option<Arc<io::Error>>),
139 WriteError(Option<Arc<io::Error>>),
140 WriteShutdown(bool),
141 ReadZeroes(usize),
142 WriteZeroes(usize),
143 IgnoreWritten(usize),
144 ReadEof(bool),
145 StopChecking,
146}
147
148struct Inner {
149 actions: VecDeque<Action>,
150 waiting: Option<Instant>,
151 sleep: Option<Pin<Box<Sleep>>>,
152 read_wait: Option<Waker>,
153 rx: UnboundedReceiverStream<Action>,
154 name: String,
155 checks_enabled: bool,
156 read_bytes: u64,
157 written_bytes: u64,
158 shutdown_checking_enabled: bool,
159 #[cfg(feature = "panicless-mode")]
160 panicless_tx: Option<tokio::sync::oneshot::Sender<MockOutcome>>,
161}
162
163impl Builder {
164 pub fn new() -> Self {
166 Self::default()
167 }
168
169 pub fn read(&mut self, buf: &[u8]) -> &mut Self {
174 self.actions.push_back(Action::Read(buf.into()));
175 self
176 }
177
178 pub fn read_zeroes(&mut self, nbytes: usize) -> &mut Self {
184 self.actions.push_back(Action::ReadZeroes(nbytes));
185 self
186 }
187
188 pub fn eof(&mut self) -> &mut Self {
192 self.actions.push_back(Action::ReadEof(false));
193 self
194 }
195
196 pub fn read_error(&mut self, error: io::Error) -> &mut Self {
201 let error = Some(error.into());
202 self.actions.push_back(Action::ReadError(error));
203 self
204 }
205
206 pub fn write(&mut self, buf: &[u8]) -> &mut Self {
211 self.actions.push_back(Action::Write(buf.into()));
212 self
213 }
214
215 pub fn write_zeroes(&mut self, nbytes: usize) -> &mut Self {
220 self.actions.push_back(Action::WriteZeroes(nbytes));
221 self
222 }
223
224 pub fn write_ignore(&mut self, nbytes: usize) -> &mut Self {
232 self.actions.push_back(Action::IgnoreWritten(nbytes));
233 self
234 }
235
236 pub fn write_error(&mut self, error: io::Error) -> &mut Self {
241 let error = Some(error.into());
242 self.actions.push_back(Action::WriteError(error));
243 self
244 }
245
246 pub fn enable_shutdown_checking(&mut self) -> &mut Self {
250 self.shutdown_checking_enabled = true;
251 self
252 }
253
254 pub fn write_shutdown(&mut self) -> &mut Self {
258 self.shutdown_checking_enabled = true;
259 self.actions.push_back(Action::WriteShutdown(false));
260 self
261 }
262
263 pub fn stop_checking(&mut self) -> &mut Self {
270 self.actions.push_back(Action::StopChecking);
271 self
272 }
273
274 pub fn wait(&mut self, duration: Duration) -> &mut Self {
279 let duration = cmp::max(duration, Duration::from_millis(1));
280 self.actions.push_back(Action::Wait(duration));
281 self
282 }
283
284 pub fn name(&mut self, name: impl Into<String>) -> &mut Self {
286 self.name = name.into();
287 self
288 }
289
290 #[cfg(feature = "text-scenarios")]
334 #[cfg_attr(docsrs_alt, doc(cfg(feature = "text-scenarios")))]
335 pub fn text_scenario(&mut self, scenario: &str) -> Result<&mut Self, ParseError> {
336 let (items, name) = text_scenario::parse_text_scenario(scenario)?;
337 if let Some(name) = name {
338 self.name = name;
339 }
340 self.actions.extend(items);
341 Ok(self)
342 }
343
344 pub fn build(&mut self) -> Mock {
346 let (mock, _) = self.build_with_handle();
347 mock
348 }
349
350 pub fn build_with_handle(&mut self) -> (Mock, Handle) {
352 let (inner, handle) = Inner::new(
353 self.actions.clone(),
354 self.name.clone(),
355 self.shutdown_checking_enabled,
356 );
357
358 let mock = Mock { inner };
359
360 (mock, handle)
361 }
362
363 #[cfg(feature = "panicless-mode")]
367 #[cfg_attr(docsrs_alt, doc(cfg(feature = "panicless-mode")))]
368 pub fn build_panicless(
369 &mut self,
370 ) -> (Mock, Handle, tokio::sync::oneshot::Receiver<MockOutcome>) {
371 let (mut inner, handle) = Inner::new(
372 self.actions.clone(),
373 self.name.clone(),
374 self.shutdown_checking_enabled,
375 );
376
377 let (tx, rx) = tokio::sync::oneshot::channel();
378
379 inner.panicless_tx = Some(tx);
380
381 let mock = Mock { inner };
382
383 (mock, handle, rx)
384 }
385}
386
387impl Handle {
388 pub fn read(&mut self, buf: &[u8]) -> &mut Self {
393 self.tx.send(Action::Read(buf.into())).unwrap();
394 self
395 }
396
397 pub fn read_zeroes(&mut self, nbytes: usize) -> &mut Self {
403 self.tx.send(Action::ReadZeroes(nbytes)).unwrap();
404 self
405 }
406
407 pub fn eof(&mut self) -> &mut Self {
411 self.tx.send(Action::ReadEof(false)).unwrap();
412 self
413 }
414
415 pub fn read_error(&mut self, error: io::Error) -> &mut Self {
420 let error = Some(error.into());
421 self.tx.send(Action::ReadError(error)).unwrap();
422 self
423 }
424
425 pub fn write(&mut self, buf: &[u8]) -> &mut Self {
430 self.tx.send(Action::Write(buf.into())).unwrap();
431 self
432 }
433
434 pub fn write_zeroes(&mut self, nbytes: usize) -> &mut Self {
439 self.tx.send(Action::WriteZeroes(nbytes)).unwrap();
440 self
441 }
442
443 pub fn write_ignore(&mut self, nbytes: usize) -> &mut Self {
451 self.tx.send(Action::IgnoreWritten(nbytes)).unwrap();
452 self
453 }
454
455 pub fn write_error(&mut self, error: io::Error) -> &mut Self {
460 let error = Some(error.into());
461 self.tx.send(Action::WriteError(error)).unwrap();
462 self
463 }
464
465 pub fn write_shutdown(&mut self) -> &mut Self {
469 self.tx.send(Action::WriteShutdown(false)).unwrap();
470 self
471 }
472
473 pub fn stop_checking(&mut self) -> &mut Self {
480 self.tx.send(Action::StopChecking).unwrap();
481 self
482 }
483
484 #[cfg(feature = "text-scenarios")]
490 #[cfg_attr(docsrs_alt, doc(cfg(feature = "text-scenarios")))]
491 pub fn text_scenario(&mut self, scenario: &str) -> Result<&mut Self, ParseError> {
492 let (items, name) = text_scenario::parse_text_scenario(scenario)?;
493 if let Some(_name) = name {
494 }
496 for x in items {
497 self.tx.send(x).unwrap();
498 }
499 Ok(self)
500 }
501}
502
503impl Inner {
504 fn new(
505 actions: VecDeque<Action>,
506 name: String,
507 shutdown_checking_enabled: bool,
508 ) -> (Inner, Handle) {
509 let (tx, rx) = mpsc::unbounded_channel();
510
511 let rx = UnboundedReceiverStream::new(rx);
512
513 let inner = Inner {
514 actions,
515 sleep: None,
516 read_wait: None,
517 rx,
518 waiting: None,
519 name,
520 checks_enabled: true,
521 read_bytes: 0,
522 written_bytes: 0,
523 shutdown_checking_enabled,
524 #[cfg(feature = "panicless-mode")]
525 panicless_tx: None,
526 };
527
528 let handle = Handle { tx };
529
530 (inner, handle)
531 }
532
533 fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
534 Pin::new(&mut self.rx).poll_next(cx)
535 }
536
537 fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> {
538 match self.action() {
539 Some(&mut Action::ReadZeroes(ref mut nbytes)) => {
540 let n = cmp::min(dst.remaining(), *nbytes);
541 let nfilled = dst.filled().len();
542 dst.initialize_unfilled_to(nfilled + n)[nfilled..].fill(0);
543 dst.set_filled(nfilled + n);
544 *nbytes -= n;
545 self.read_bytes += n as u64;
546 Ok(())
547 }
548 Some(&mut Action::ReadEof(ref mut observed)) => {
549 *observed = true;
550 Ok(())
551 }
552 Some(&mut Action::Read(ref mut data)) => {
553 let n = cmp::min(dst.remaining(), data.len());
555
556 dst.put_slice(&data[..n]);
558
559 data.drain(..n);
561
562 self.read_bytes += n as u64;
563
564 Ok(())
565 }
566 Some(&mut Action::ReadError(ref mut err)) => {
567 let err = err.take().expect("Should have been removed from actions.");
569 let err = Arc::try_unwrap(err).expect("There are no other references.");
570 Err(err)
571 }
572 Some(_) => {
573 Err(io::ErrorKind::WouldBlock.into())
575 }
576 None => Ok(()),
577 }
578 }
579
580 fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
581 let mut ret = 0;
582
583 if self.actions.is_empty() {
584 return Err(io::ErrorKind::BrokenPipe.into());
585 }
586
587 if let Some(&mut Action::Wait(..)) = self.action() {
588 return Err(io::ErrorKind::WouldBlock.into());
589 }
590
591 if let Some(&mut Action::WriteError(ref mut err)) = self.action() {
592 let err = err.take().expect("Should have been removed from actions.");
593 let err = Arc::try_unwrap(err).expect("There are no other references.");
594 return Err(err);
595 }
596
597 let mut checks_enabled = self.checks_enabled;
598
599 let n_remaining_actions = self.actions.len();
600 for i in 0..n_remaining_actions {
601 let action = &mut self.actions[i];
602 let ignore_written = matches!(action, Action::IgnoreWritten { .. });
603 match action {
604 Action::Write(ref mut expect) => {
605 let n = cmp::min(src.len(), expect.len());
606
607 #[allow(unused_mut)]
608 let mut already_checked = false;
609
610 #[cfg(feature = "panicless-mode")]
611 if checks_enabled && self.panicless_tx.is_some() {
612 for i in 0..n {
613 let expected = expect[i];
614 let actual = src[i];
615 if expected != actual {
616 let _ = self.panicless_tx.take().unwrap().send(MockOutcome {
617 outcome: Err(MockOutcomeError::WrittenByteMismatch {
618 expected,
619 actual,
620 }),
621 total_read_bytes: self.read_bytes,
622 total_written_bytes: self.written_bytes,
623 });
624 self.checks_enabled = false;
625 }
626 self.written_bytes += 1;
627 }
628 already_checked = true;
629 }
630
631 if self.checks_enabled && !already_checked {
632 assert_eq!(
633 &src[..n],
634 &expect[..n],
635 "name={} r={} w={} remaining actions: {}",
636 self.name,
637 self.read_bytes,
638 self.written_bytes,
639 n_remaining_actions - i
640 );
641 self.written_bytes += n as u64;
642 }
643 if !already_checked {
644 self.written_bytes += n as u64;
645 }
646
647 expect.drain(..n);
649 src = &src[n..];
650
651 ret += n;
652
653 if src.is_empty() {
654 return Ok(ret);
655 }
656 }
657 Action::WriteZeroes(ref mut nbytes) | Action::IgnoreWritten(ref mut nbytes) => {
658 let n = cmp::min(src.len(), *nbytes);
659
660 #[allow(unused_mut)]
661 let mut already_checked = false;
662
663 #[cfg(feature = "panicless-mode")]
664 if checks_enabled && !ignore_written && self.panicless_tx.is_some() {
665 for i in 0..n {
666 let expected = 0;
667 let actual = src[i];
668 if expected != actual {
669 let _ = self.panicless_tx.take().unwrap().send(MockOutcome {
671 outcome: Err(MockOutcomeError::WrittenByteMismatch {
672 expected,
673 actual,
674 }),
675 total_read_bytes: self.read_bytes,
676 total_written_bytes: self.written_bytes,
677 });
678 self.checks_enabled = false;
679 }
680 self.written_bytes += 1;
681 }
682 already_checked = true;
683 }
684
685 if checks_enabled && !ignore_written && !already_checked {
686 for (j, x) in src[..n].iter().enumerate() {
687 self.written_bytes += 1;
688 assert_eq!(
689 *x,
690 0,
691 "byte_index={j} r={} w={} name={} remaining actions: {}",
692 self.read_bytes,
693 self.written_bytes,
694 self.name,
695 n_remaining_actions - i
696 );
697 }
698 } else {
699 if !already_checked {
700 self.written_bytes += n as u64;
701 }
702 }
703
704 *nbytes -= n;
706 src = &src[n..];
707
708 ret += n;
709
710 if src.is_empty() {
711 return Ok(ret);
712 }
713 }
714 Action::WriteShutdown(ref mut observed) => {
715 #[cfg(feature = "panicless-mode")]
716 if checks_enabled && self.panicless_tx.is_some() {
717 let _ = self.panicless_tx.take().unwrap().send(MockOutcome {
718 outcome: Err(MockOutcomeError::WriteInsteadOfShutdown),
719 total_read_bytes: self.read_bytes,
720 total_written_bytes: self.written_bytes,
721 });
722 self.checks_enabled = false;
723 *observed = true;
724 return Err(io::ErrorKind::InvalidData.into());
725 }
726
727 if checks_enabled {
728 panic!(
729 "unexpected write (a shutdown was expected) r={} w={} name={} remaining actions: {}",
730 self.read_bytes,
731 self.written_bytes,
732 self.name,
733 n_remaining_actions - i
734 );
735 } else {
736 *observed = true;
737 return Err(io::ErrorKind::InvalidData.into());
738 }
739 }
740 Action::StopChecking => checks_enabled = false,
741 Action::Wait(..) | Action::WriteError(..) => {
742 break;
743 }
744 _ => {}
745 }
746
747 }
749
750 Ok(ret)
751 }
752
753 fn shutdown(&mut self) -> io::Result<()> {
754 if self.actions.is_empty() {
755 return Ok(());
756 }
757
758 if let Some(&mut Action::Wait(..)) = self.action() {
759 return Err(io::ErrorKind::WouldBlock.into());
760 }
761
762 let mut checks_enabled = self.checks_enabled;
763
764 let n_remaining_actions = self.actions.len();
765 for i in 0..n_remaining_actions {
766 let action = &mut self.actions[i];
767 let mut pending_writes = false;
768 match action {
769 Action::Write(ref buf) if !buf.is_empty() => pending_writes = true,
770 Action::IgnoreWritten(nbytes) if *nbytes > 0 => pending_writes = true,
771 Action::WriteError(ref x) if x.is_some() => pending_writes = true,
772 Action::WriteZeroes(nbytes) if *nbytes > 0 => pending_writes = true,
773 Action::WriteShutdown(ref mut observed) => {
774 *observed = true;
775 return Ok(());
776 }
777 Action::StopChecking => checks_enabled = false,
778 Action::Wait(..) => {
779 break;
780 }
781 _ => {}
782 }
783
784 if pending_writes {
785 #[cfg(feature = "panicless-mode")]
786 if self.checks_enabled && self.panicless_tx.is_some() {
787 let _ = self.panicless_tx.take().unwrap().send(MockOutcome {
788 outcome: Err(MockOutcomeError::ShutdownInsteadOfWrite),
789 total_read_bytes: self.read_bytes,
790 total_written_bytes: self.written_bytes,
791 });
792 self.checks_enabled = false;
793 return Err(io::ErrorKind::InvalidData.into());
794 }
795
796 if checks_enabled {
797 panic!(
798 "Unexpected shutdown (there are more pending write actions) name={} r={} w={} remaining actions: {}",
799 self.name,
800 self.read_bytes,
801 self.written_bytes,
802 n_remaining_actions - i
803 );
804 }
805 }
806 }
807
808 Ok(())
809 }
810
811 fn remaining_wait(&mut self) -> Option<Duration> {
812 match self.action() {
813 Some(&mut Action::Wait(dur)) => Some(dur),
814 _ => None,
815 }
816 }
817
818 fn action(&mut self) -> Option<&mut Action> {
819 loop {
820 if self.actions.is_empty() {
821 return None;
822 }
823
824 match self.actions[0] {
825 Action::Read(ref mut data) => {
826 if !data.is_empty() {
827 break;
828 }
829 }
830 Action::ReadZeroes(n) => {
831 if n > 0 {
832 break;
833 }
834 }
835 Action::ReadEof(ref observed) => {
836 if !observed {
837 break;
838 }
839 }
840 Action::Write(ref mut data) => {
841 if !data.is_empty() {
842 break;
843 }
844 }
845 Action::WriteZeroes(n) => {
846 if n > 0 {
847 break;
848 }
849 }
850 Action::IgnoreWritten(n) => {
851 if n > 0 {
852 break;
853 }
854 }
855 Action::Wait(ref mut dur) => {
856 if let Some(until) = self.waiting {
857 let now = Instant::now();
858
859 if now < until {
860 break;
861 } else {
862 self.waiting = None;
863 }
864 } else {
865 self.waiting = Some(Instant::now() + *dur);
866 break;
867 }
868 }
869 Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => {
870 if error.is_some() {
871 break;
872 }
873 }
874 Action::WriteShutdown(ref observed) => {
875 if !*observed {
876 break;
877 }
878 }
879 Action::StopChecking => {
880 self.checks_enabled = false;
881 break;
882 }
883 }
884
885 let _action = self.actions.pop_front();
886 }
887
888 self.actions.front_mut()
889 }
890}
891
892impl Mock {
895 fn maybe_wakeup_reader(&mut self) {
896 match self.inner.action() {
897 Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => {
898 if let Some(waker) = self.inner.read_wait.take() {
899 waker.wake();
900 }
901 }
902 _ => {}
903 }
904 }
905}
906
907impl AsyncRead for Mock {
908 fn poll_read(
909 mut self: Pin<&mut Self>,
910 cx: &mut task::Context<'_>,
911 buf: &mut ReadBuf<'_>,
912 ) -> Poll<io::Result<()>> {
913 loop {
914 if let Some(ref mut sleep) = self.inner.sleep {
915 ready!(Pin::new(sleep).poll(cx));
916 }
917
918 self.inner.sleep = None;
920
921 let filled = buf.filled().len();
923
924 match self.inner.read(buf) {
925 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
926 if let Some(rem) = self.inner.remaining_wait() {
927 let until = Instant::now() + rem;
928 self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
929 } else {
930 self.inner.read_wait = Some(cx.waker().clone());
931 return Poll::Pending;
932 }
933 }
934 Ok(()) => {
935 if buf.filled().len() == filled {
936 match ready!(self.inner.poll_action(cx)) {
937 Some(action) => {
938 self.inner.actions.push_back(action);
939 continue;
940 }
941 None => {
942 return Poll::Ready(Ok(()));
943 }
944 }
945 } else {
946 return Poll::Ready(Ok(()));
947 }
948 }
949 Err(e) => return Poll::Ready(Err(e)),
950 }
951 }
952 }
953}
954
955impl AsyncWrite for Mock {
956 fn poll_write(
957 mut self: Pin<&mut Self>,
958 cx: &mut task::Context<'_>,
959 buf: &[u8],
960 ) -> Poll<io::Result<usize>> {
961 loop {
962 if let Some(ref mut sleep) = self.inner.sleep {
963 ready!(Pin::new(sleep).poll(cx));
964 }
965
966 self.inner.sleep = None;
968
969 if self.inner.actions.is_empty() {
970 match self.inner.poll_action(cx) {
971 Poll::Pending => {
972 }
974 Poll::Ready(Some(action)) => {
975 self.inner.actions.push_back(action);
976 }
977 Poll::Ready(None) => {
978 #[cfg(feature = "panicless-mode")]
979 if self.inner.checks_enabled && self.inner.panicless_tx.is_some() {
980 let _ = self.inner.panicless_tx.take().unwrap().send(MockOutcome {
981 outcome: Err(MockOutcomeError::UnexpectedWrite),
982 total_read_bytes: self.inner.read_bytes,
983 total_written_bytes: self.inner.written_bytes,
984 });
985 self.inner.checks_enabled = false;
986 }
987
988 if self.inner.checks_enabled {
989 panic!("unexpected write {}", self.pmsg());
990 } else {
991 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
992 }
993 }
994 }
995 }
996
997 match self.inner.write(buf) {
998 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
999 if let Some(rem) = self.inner.remaining_wait() {
1000 let until = Instant::now() + rem;
1001 self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
1002 } else {
1003 #[cfg(feature = "panicless-mode")]
1004 if self.inner.checks_enabled && self.inner.panicless_tx.is_some() {
1005 let _ = self.inner.panicless_tx.take().unwrap().send(MockOutcome {
1006 outcome: Err(MockOutcomeError::Other),
1007 total_read_bytes: self.inner.read_bytes,
1008 total_written_bytes: self.inner.written_bytes,
1009 });
1010 self.inner.checks_enabled = false;
1011 }
1012
1013 if self.inner.checks_enabled {
1014 panic!("unexpected WouldBlock {}", self.pmsg());
1015 } else {
1016 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
1017 }
1018 }
1019 }
1020 Ok(0) => {
1021 if self.inner.action().is_some() {
1023 return Poll::Pending;
1024 }
1025
1026 match ready!(self.inner.poll_action(cx)) {
1028 Some(action) => {
1029 self.inner.actions.push_back(action);
1030 continue;
1031 }
1032 None => {
1033 #[cfg(feature = "panicless-mode")]
1034 if self.inner.checks_enabled && self.inner.panicless_tx.is_some() {
1035 let _ = self.inner.panicless_tx.take().unwrap().send(MockOutcome {
1036 outcome: Err(MockOutcomeError::UnexpectedWrite),
1037 total_read_bytes: self.inner.read_bytes,
1038 total_written_bytes: self.inner.written_bytes,
1039 });
1040 self.inner.checks_enabled = false;
1041 }
1042
1043 if self.inner.checks_enabled {
1044 panic!("unexpected write {}", self.pmsg());
1045 } else {
1046 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
1047 }
1048 }
1049 }
1050 }
1051 ret => {
1052 self.maybe_wakeup_reader();
1053 return Poll::Ready(ret);
1054 }
1055 }
1056 }
1057 }
1058
1059 fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
1060 Poll::Ready(Ok(()))
1061 }
1062
1063 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
1064 if !self.inner.shutdown_checking_enabled {
1065 return Poll::Ready(Ok(()));
1066 }
1067 loop {
1068 if let Some(ref mut sleep) = self.inner.sleep {
1069 ready!(Pin::new(sleep).poll(cx));
1070 }
1071
1072 self.inner.sleep = None;
1074
1075 if self.inner.actions.is_empty() {
1076 match self.inner.poll_action(cx) {
1077 Poll::Pending => {
1078 }
1080 Poll::Ready(Some(action)) => {
1081 self.inner.actions.push_back(action);
1082 }
1083 Poll::Ready(None) => {
1084 return Poll::Ready(Ok(()));
1086 }
1087 }
1088 }
1089
1090 match self.inner.shutdown() {
1091 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1092 if let Some(rem) = self.inner.remaining_wait() {
1093 let until = Instant::now() + rem;
1094 self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
1095 } else {
1096 #[cfg(feature = "panicless-mode")]
1097 if self.inner.checks_enabled && self.inner.panicless_tx.is_some() {
1098 let _ = self.inner.panicless_tx.take().unwrap().send(MockOutcome {
1099 outcome: Err(MockOutcomeError::Other),
1100 total_read_bytes: self.inner.read_bytes,
1101 total_written_bytes: self.inner.written_bytes,
1102 });
1103 self.inner.checks_enabled = false;
1104 }
1105
1106 if self.inner.checks_enabled {
1107 panic!("unexpected WouldBlock {}", self.pmsg());
1108 } else {
1109 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
1110 }
1111 }
1112 }
1113 ret => {
1114 self.maybe_wakeup_reader();
1115 return Poll::Ready(ret);
1116 }
1117 }
1118 }
1119 }
1120}
1121
1122impl Drop for Mock {
1124 fn drop(&mut self) {
1125 #[cfg(feature = "panicless-mode")]
1126 if let Some(tx) = self.inner.panicless_tx.take() {
1127 let mut outcome = Ok(());
1128
1129 if self.inner.checks_enabled {
1130 self.inner.actions.iter().for_each(|a| match a {
1131 Action::Read(data) if !data.is_empty() => {
1132 outcome = Err(MockOutcomeError::RemainingUnreadData)
1133 }
1134 Action::ReadZeroes(nbytes) if *nbytes > 0 => {
1135 outcome = Err(MockOutcomeError::RemainingUnreadData)
1136 }
1137 Action::ReadEof(observed) if !observed => {
1138 outcome = Err(MockOutcomeError::RemainingUnreadData)
1139 }
1140 Action::Write(data) if !data.is_empty() => {
1141 outcome = Err(MockOutcomeError::RemainingUnwrittenData)
1142 }
1143 Action::WriteZeroes(nbytes) if *nbytes > 0 => {
1144 outcome = Err(MockOutcomeError::RemainingUnwrittenData)
1145 }
1146 Action::IgnoreWritten(nbytes) if *nbytes > 0 => {
1147 outcome = Err(MockOutcomeError::RemainingUnwrittenData)
1148 }
1149 _ => (),
1150 });
1151 }
1152
1153 let _ = tx.send(MockOutcome {
1154 outcome,
1155 total_read_bytes: self.inner.read_bytes,
1156 total_written_bytes: self.inner.written_bytes,
1157 });
1158 return;
1159 }
1160
1161 if std::thread::panicking() {
1163 return;
1164 }
1165
1166 if !self.inner.checks_enabled {
1167 return;
1168 }
1169
1170 self.inner.actions.iter().for_each(|a| match a {
1171 Action::Read(data) => assert!(
1172 data.is_empty(),
1173 "There is still data left to read. {}",
1174 self.pmsg()
1175 ),
1176 Action::ReadZeroes(nbytes) => assert!(
1177 *nbytes == 0,
1178 "There is still data left to read. {}",
1179 self.pmsg()
1180 ),
1181 Action::ReadEof(observed) => assert!(
1182 observed,
1183 "There is still a read EOF event that was not observed {}",
1184 self.pmsg()
1185 ),
1186 Action::Write(data) => assert!(
1187 data.is_empty(),
1188 "There is still data left to write. {}",
1189 self.pmsg()
1190 ),
1191 Action::WriteZeroes(nbytes) => assert!(
1192 *nbytes == 0,
1193 "There is still data left to write. {}",
1194 self.pmsg()
1195 ),
1196 Action::IgnoreWritten(nbytes) => assert!(
1197 *nbytes == 0,
1198 "There is still data left to write (even though content is to be ignored). {}",
1199 self.pmsg()
1200 ),
1201 _ => (),
1202 });
1203 }
1204}
1205impl fmt::Debug for Inner {
1228 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1229 if self.name.is_empty() {
1230 write!(f, "Inner {{...}}")
1231 } else {
1232 write!(f, "Inner {{name={}, ...}}", self.name)
1233 }
1234 }
1235}
1236
1237struct PanicMsgSnippet<'a>(&'a Inner);
1238
1239impl<'a> fmt::Display for PanicMsgSnippet<'a> {
1240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1241 if self.0.name.is_empty() {
1242 write!(
1243 f,
1244 "({} actions remain, {} bytes was read, {} bytes was written)",
1245 self.0.actions.len(),
1246 self.0.read_bytes,
1247 self.0.written_bytes,
1248 )
1249 } else {
1250 write!(
1251 f,
1252 "(name {}, {} actions remain, {} bytes was read, {} bytes was written)",
1253 self.0.name,
1254 self.0.actions.len(),
1255 self.0.read_bytes,
1256 self.0.written_bytes,
1257 )
1258 }
1259 }
1260}
1261
1262impl Mock {
1263 fn pmsg(&self) -> PanicMsgSnippet<'_> {
1264 PanicMsgSnippet(&self.inner)
1265 }
1266}