#![cfg(not(loom))]
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{ready, Poll};
use std::time::Duration;
use futures_core::Stream;
use std::future::Future;
use tokio::time::{sleep_until, Instant, Sleep};
#[derive(Debug, Clone)]
enum Action<T: Unpin> {
Next(T),
Wait(Duration),
}
#[derive(Debug, Clone)]
pub struct StreamMockBuilder<T: Unpin> {
actions: VecDeque<Action<T>>,
}
impl<T: Unpin> StreamMockBuilder<T> {
pub fn new() -> Self {
StreamMockBuilder::default()
}
pub fn next(mut self, value: T) -> Self {
self.actions.push_back(Action::Next(value));
self
}
pub fn wait(mut self, duration: Duration) -> Self {
self.actions.push_back(Action::Wait(duration));
self
}
pub fn build(self) -> StreamMock<T> {
StreamMock {
actions: self.actions,
sleep: None,
}
}
}
impl<T: Unpin> Default for StreamMockBuilder<T> {
fn default() -> Self {
StreamMockBuilder {
actions: VecDeque::new(),
}
}
}
#[derive(Debug)]
pub struct StreamMock<T: Unpin> {
actions: VecDeque<Action<T>>,
sleep: Option<Pin<Box<Sleep>>>,
}
impl<T: Unpin> StreamMock<T> {
fn next_action(&mut self) -> Option<Action<T>> {
self.actions.pop_front()
}
}
impl<T: Unpin> Stream for StreamMock<T> {
type Item = T;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if let Some(ref mut sleep) = self.sleep {
ready!(Pin::new(sleep).poll(cx));
self.sleep.take();
}
match self.next_action() {
Some(action) => match action {
Action::Next(item) => Poll::Ready(Some(item)),
Action::Wait(duration) => {
self.sleep = Some(Box::pin(sleep_until(Instant::now() + duration)));
cx.waker().wake_by_ref();
Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}
impl<T: Unpin> Drop for StreamMock<T> {
fn drop(&mut self) {
if std::thread::panicking() {
return;
}
let undropped_count = self
.actions
.iter()
.filter(|action| match action {
Action::Next(_) => true,
Action::Wait(_) => false,
})
.count();
assert!(
undropped_count == 0,
"StreamMock was dropped before all actions were consumed, {undropped_count} actions were not consumed"
);
}
}