use super::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TryStreamError<E> {
Inner(E),
PolledAfterCompletion,
}
impl<E: std::fmt::Display> std::fmt::Display for TryStreamError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Inner(err) => err.fmt(f),
Self::PolledAfterCompletion => {
write!(f, "try_stream future polled after completion")
}
}
}
}
impl<E: std::error::Error + 'static> std::error::Error for TryStreamError<E> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Inner(err) => Some(err),
Self::PolledAfterCompletion => None,
}
}
}
impl<E> From<E> for TryStreamError<E> {
#[inline]
fn from(value: E) -> Self {
Self::Inner(value)
}
}
const TRY_STREAM_COOPERATIVE_BUDGET: usize = 1024;
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct TryCollect<S, C> {
stream: S,
collection: C,
completed: bool,
}
impl<S, C> TryCollect<S, C> {
#[inline]
pub(crate) fn new(stream: S, collection: C) -> Self {
Self {
stream,
collection,
completed: false,
}
}
}
impl<S: Unpin, C> Unpin for TryCollect<S, C> {}
impl<S, T, E, C> Future for TryCollect<S, C>
where
S: Stream<Item = Result<T, E>> + Unpin,
C: Default + Extend<T>,
{
type Output = Result<C, TryStreamError<E>>;
#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.completed {
return Poll::Ready(Err(TryStreamError::PolledAfterCompletion));
}
let mut processed_this_poll = 0usize;
loop {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(Ok(item))) => {
self.collection.extend(std::iter::once(item));
processed_this_poll += 1;
if processed_this_poll >= TRY_STREAM_COOPERATIVE_BUDGET {
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
Poll::Ready(Some(Err(e))) => {
self.completed = true;
return Poll::Ready(Err(TryStreamError::Inner(e)));
}
Poll::Ready(None) => {
self.completed = true;
return Poll::Ready(Ok(std::mem::take(&mut self.collection)));
}
Poll::Pending => return Poll::Pending,
}
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct TryFold<S, F, Acc> {
stream: S,
f: F,
acc: Option<Acc>,
completed: bool,
}
impl<S, F, Acc> TryFold<S, F, Acc> {
#[inline]
pub(crate) fn new(stream: S, init: Acc, f: F) -> Self {
Self {
stream,
f,
acc: Some(init),
completed: false,
}
}
}
impl<S: Unpin, F, Acc> Unpin for TryFold<S, F, Acc> {}
impl<S, F, Acc, T, E> Future for TryFold<S, F, Acc>
where
S: Stream<Item = Result<T, E>> + Unpin,
F: FnMut(Acc, T) -> Result<Acc, E>,
{
type Output = Result<Acc, TryStreamError<E>>;
#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.completed {
return Poll::Ready(Err(TryStreamError::PolledAfterCompletion));
}
let mut processed_this_poll = 0usize;
loop {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(Ok(item))) => {
let acc = self
.acc
.take()
.expect("TryFold accumulator missing before completion");
match (self.f)(acc, item) {
Ok(new_acc) => self.acc = Some(new_acc),
Err(e) => {
self.completed = true;
return Poll::Ready(Err(TryStreamError::Inner(e)));
}
}
processed_this_poll += 1;
if processed_this_poll >= TRY_STREAM_COOPERATIVE_BUDGET {
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
Poll::Ready(Some(Err(e))) => {
self.completed = true;
return Poll::Ready(Err(TryStreamError::Inner(e)));
}
Poll::Ready(None) => {
self.completed = true;
return Poll::Ready(Ok(self
.acc
.take()
.expect("TryFold accumulator missing on completion")));
}
Poll::Pending => return Poll::Pending,
}
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct TryForEach<S, F> {
stream: S,
f: F,
completed: bool,
}
impl<S, F> TryForEach<S, F> {
#[inline]
pub(crate) fn new(stream: S, f: F) -> Self {
Self {
stream,
f,
completed: false,
}
}
}
impl<S: Unpin, F> Unpin for TryForEach<S, F> {}
impl<S, F, E> Future for TryForEach<S, F>
where
S: Stream + Unpin,
F: FnMut(S::Item) -> Result<(), E>,
{
type Output = Result<(), TryStreamError<E>>;
#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.completed {
return Poll::Ready(Err(TryStreamError::PolledAfterCompletion));
}
let mut processed_this_poll = 0usize;
loop {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(item)) => {
if let Err(e) = (self.f)(item) {
self.completed = true;
return Poll::Ready(Err(TryStreamError::Inner(e)));
}
processed_this_poll += 1;
if processed_this_poll >= TRY_STREAM_COOPERATIVE_BUDGET {
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
Poll::Ready(None) => {
self.completed = true;
return Poll::Ready(Ok(()));
}
Poll::Pending => return Poll::Pending,
}
}
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use crate::stream::iter;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
fn noop_waker() -> Waker {
std::task::Waker::noop().clone()
}
struct TrackWaker(Arc<AtomicBool>);
use std::task::Wake;
impl Wake for TrackWaker {
fn wake(self: Arc<Self>) {
self.0.store(true, Ordering::SeqCst);
}
fn wake_by_ref(self: &Arc<Self>) {
self.0.store(true, Ordering::SeqCst);
}
}
#[derive(Debug, Default)]
struct AlwaysReadyOkStream {
next: usize,
end: usize,
}
impl AlwaysReadyOkStream {
fn new(end: usize) -> Self {
Self { next: 0, end }
}
}
impl Stream for AlwaysReadyOkStream {
type Item = Result<usize, &'static str>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.next >= self.end {
return Poll::Ready(None);
}
let item = self.next;
self.next += 1;
Poll::Ready(Some(Ok(item)))
}
}
#[derive(Debug, Default)]
struct AlwaysReadyValueStream {
next: usize,
end: usize,
}
impl AlwaysReadyValueStream {
fn new(end: usize) -> Self {
Self { next: 0, end }
}
}
impl Stream for AlwaysReadyValueStream {
type Item = usize;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.next >= self.end {
return Poll::Ready(None);
}
let item = self.next;
self.next += 1;
Poll::Ready(Some(item))
}
}
#[derive(Debug)]
struct PollCountingEmptyTryStream {
polls: Arc<AtomicUsize>,
}
impl PollCountingEmptyTryStream {
fn new(polls: Arc<AtomicUsize>) -> Self {
Self { polls }
}
}
impl Stream for PollCountingEmptyTryStream {
type Item = Result<i32, &'static str>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.polls.fetch_add(1, Ordering::SeqCst);
Poll::Ready(None)
}
}
#[derive(Debug)]
struct PollCountingEmptyValueStream {
polls: Arc<AtomicUsize>,
}
impl PollCountingEmptyValueStream {
fn new(polls: Arc<AtomicUsize>) -> Self {
Self { polls }
}
}
impl Stream for PollCountingEmptyValueStream {
type Item = i32;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.polls.fetch_add(1, Ordering::SeqCst);
Poll::Ready(None)
}
}
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn poll_ready<F>(future: &mut F) -> F::Output
where
F: Future + Unpin,
{
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(future).poll(&mut cx) {
Poll::Ready(output) => output,
Poll::Pending => panic!("expected immediately-ready try stream future"),
}
}
fn ok_items(values: &[i32]) -> Vec<Result<i32, &'static str>> {
values
.iter()
.copied()
.map(Ok::<i32, &'static str>)
.collect()
}
fn collect_result(
items: Vec<Result<i32, &'static str>>,
) -> Result<Vec<i32>, TryStreamError<&'static str>> {
let mut future = TryCollect::new(iter(items), Vec::new());
poll_ready(&mut future)
}
fn fold_sum(
items: Vec<Result<i32, &'static str>>,
seed: i32,
) -> Result<i32, TryStreamError<&'static str>> {
let mut future = TryFold::new(iter(items), seed, |acc, x| Ok::<i32, &'static str>(acc + x));
poll_ready(&mut future)
}
fn for_each_record(
items: Vec<i32>,
fail_at: Option<i32>,
) -> (Result<(), TryStreamError<&'static str>>, Vec<i32>) {
let mut recorded = Vec::new();
let output = {
let mut future = TryForEach::new(iter(items), |item| {
if Some(item) == fail_at {
Err("for_each failure")
} else {
recorded.push(item);
Ok(())
}
});
poll_ready(&mut future)
};
(output, recorded)
}
#[test]
fn try_collect_success() {
init_test("try_collect_success");
let items: Vec<Result<i32, &str>> = vec![Ok(1), Ok(2), Ok(3)];
let mut future = TryCollect::new(iter(items), Vec::new());
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut future).poll(&mut cx) {
Poll::Ready(Ok(collected)) => {
let ok = collected == vec![1, 2, 3];
crate::assert_with_log!(ok, "collected", vec![1, 2, 3], collected);
}
Poll::Ready(Err(_)) => panic!("expected Ok"),
Poll::Pending => panic!("expected Ready"),
}
crate::test_complete!("try_collect_success");
}
#[test]
fn try_collect_error() {
init_test("try_collect_error");
let items: Vec<Result<i32, &str>> = vec![Ok(1), Err("error"), Ok(3)];
let mut future = TryCollect::new(iter(items), Vec::new());
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut future).poll(&mut cx) {
Poll::Ready(Err(TryStreamError::Inner(e))) => {
let ok = e == "error";
crate::assert_with_log!(ok, "error", "error", e);
}
Poll::Ready(Err(TryStreamError::PolledAfterCompletion)) => {
panic!("unexpected PolledAfterCompletion")
}
Poll::Ready(Ok(_)) => panic!("expected Err"),
Poll::Pending => panic!("expected Ready"),
}
crate::test_complete!("try_collect_error");
}
#[test]
fn try_collect_empty() {
init_test("try_collect_empty");
let items: Vec<Result<i32, &str>> = vec![];
let mut future = TryCollect::new(iter(items), Vec::new());
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut future).poll(&mut cx) {
Poll::Ready(Ok(collected)) => {
let empty = collected.is_empty();
crate::assert_with_log!(empty, "collected empty", true, empty);
}
Poll::Ready(Err(_)) => panic!("expected Ok"),
Poll::Pending => panic!("expected Ready"),
}
crate::test_complete!("try_collect_empty");
}
#[test]
fn mr_try_collect_partition_matches_combined_ok_input() {
init_test("mr_try_collect_partition_matches_combined_ok_input");
let cases = vec![
Vec::new(),
vec![5],
vec![-3, 0, 8, 13],
(0..17).map(|index| index * 2 - 9).collect(),
];
for values in cases {
let combined = collect_result(ok_items(&values));
crate::assert_with_log!(
combined == Ok(values.clone()),
"combined collection is identity",
Ok::<Vec<i32>, TryStreamError<&'static str>>(values.clone()),
combined.clone()
);
for split in 0..=values.len() {
let mut segmented = collect_result(ok_items(&values[..split]))
.expect("left ok partition should collect");
segmented.extend(
collect_result(ok_items(&values[split..]))
.expect("right ok partition should collect"),
);
crate::assert_with_log!(
segmented == values,
"segmented collection matches combined input",
values.clone(),
segmented
);
}
}
crate::test_complete!("mr_try_collect_partition_matches_combined_ok_input");
}
#[test]
fn mr_try_collect_first_error_is_suffix_invariant() {
init_test("mr_try_collect_first_error_is_suffix_invariant");
let suffixes: Vec<Vec<Result<i32, &'static str>>> = vec![
vec![],
vec![Ok(99), Ok(100)],
vec![Err("later error"), Ok(-1), Err("last error")],
];
for suffix in suffixes {
let mut items = vec![Ok(1), Ok(2), Err("first error")];
items.extend(suffix);
let result = collect_result(items);
crate::assert_with_log!(
result == Err(TryStreamError::Inner("first error")),
"first stream error wins regardless of suffix",
Err::<Vec<i32>, TryStreamError<&'static str>>(TryStreamError::Inner("first error")),
result
);
}
crate::test_complete!("mr_try_collect_first_error_is_suffix_invariant");
}
#[test]
fn try_fold_success() {
init_test("try_fold_success");
let items: Vec<Result<i32, &str>> = vec![Ok(1), Ok(2), Ok(3)];
let mut future = TryFold::new(iter(items), 0i32, |acc, x| Ok(acc + x));
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut future).poll(&mut cx) {
Poll::Ready(Ok(sum)) => {
let ok = sum == 6;
crate::assert_with_log!(ok, "sum", 6, sum);
}
Poll::Ready(Err(_)) => panic!("expected Ok"),
Poll::Pending => panic!("expected Ready"),
}
crate::test_complete!("try_fold_success");
}
#[test]
fn try_fold_stream_error() {
init_test("try_fold_stream_error");
let items: Vec<Result<i32, &str>> = vec![Ok(1), Err("stream error"), Ok(3)];
let mut future = TryFold::new(iter(items), 0i32, |acc, x| Ok(acc + x));
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut future).poll(&mut cx) {
Poll::Ready(Err(TryStreamError::Inner(e))) => {
let ok = e == "stream error";
crate::assert_with_log!(ok, "stream error", "stream error", e);
}
Poll::Ready(Err(TryStreamError::PolledAfterCompletion)) => {
panic!("unexpected PolledAfterCompletion")
}
Poll::Ready(Ok(_)) => panic!("expected Err"),
Poll::Pending => panic!("expected Ready"),
}
crate::test_complete!("try_fold_stream_error");
}
#[test]
fn try_fold_closure_error() {
init_test("try_fold_closure_error");
let items: Vec<Result<i32, &str>> = vec![Ok(1), Ok(2), Ok(3)];
let mut future = TryFold::new(iter(items), 0i32, |acc, x| {
if x == 2 {
Err("closure error")
} else {
Ok(acc + x)
}
});
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut future).poll(&mut cx) {
Poll::Ready(Err(TryStreamError::Inner(e))) => {
let ok = e == "closure error";
crate::assert_with_log!(ok, "closure error", "closure error", e);
}
Poll::Ready(Err(TryStreamError::PolledAfterCompletion)) => {
panic!("unexpected PolledAfterCompletion")
}
Poll::Ready(Ok(_)) => panic!("expected Err"),
Poll::Pending => panic!("expected Ready"),
}
crate::test_complete!("try_fold_closure_error");
}
#[test]
fn mr_try_fold_partition_matches_seeded_suffix_for_ok_input() {
init_test("mr_try_fold_partition_matches_seeded_suffix_for_ok_input");
let cases = vec![
Vec::new(),
vec![7],
vec![-4, 2, 9, 11],
(0..23).map(|index| index % 5 - 2).collect(),
];
let seed = 31;
for values in cases {
let combined =
fold_sum(ok_items(&values), seed).expect("combined ok input should fold");
for split in 0..=values.len() {
let left_acc = fold_sum(ok_items(&values[..split]), seed)
.expect("left ok partition should fold");
let segmented = fold_sum(ok_items(&values[split..]), left_acc)
.expect("right ok partition should fold");
crate::assert_with_log!(
segmented == combined,
"fold over partitioned stream matches combined fold",
combined,
segmented
);
}
}
crate::test_complete!("mr_try_fold_partition_matches_seeded_suffix_for_ok_input");
}
#[test]
fn mr_try_fold_first_error_is_suffix_invariant() {
init_test("mr_try_fold_first_error_is_suffix_invariant");
let suffixes: Vec<Vec<Result<i32, &'static str>>> = vec![
vec![],
vec![Ok(10), Ok(20)],
vec![Err("later error"), Ok(30), Err("last error")],
];
for suffix in suffixes {
let mut items = vec![Ok(3), Ok(4), Err("first fold error")];
items.extend(suffix);
let result = fold_sum(items, 11);
crate::assert_with_log!(
result == Err(TryStreamError::Inner("first fold error")),
"first fold stream error wins regardless of suffix",
Err::<i32, TryStreamError<&'static str>>(TryStreamError::Inner("first fold error")),
result
);
}
crate::test_complete!("mr_try_fold_first_error_is_suffix_invariant");
}
#[test]
fn try_fold_repoll_after_completion_does_not_repoll_stream() {
init_test("try_fold_repoll_after_completion_does_not_repoll_stream");
let polls = Arc::new(AtomicUsize::new(0));
let mut future = TryFold::new(
PollCountingEmptyTryStream::new(polls.clone()),
7i32,
|acc, x| Ok::<i32, &'static str>(acc + x),
);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let first = Pin::new(&mut future).poll(&mut cx);
crate::assert_with_log!(
first == Poll::Ready(Ok(7)),
"first poll returns final accumulator",
Poll::Ready(Ok::<i32, TryStreamError<&'static str>>(7)),
first
);
crate::assert_with_log!(
polls.load(Ordering::SeqCst) == 1,
"first poll touches upstream once",
1,
polls.load(Ordering::SeqCst)
);
let second = Pin::new(&mut future).poll(&mut cx);
crate::assert_with_log!(
matches!(
second,
Poll::Ready(Err(TryStreamError::<&'static str>::PolledAfterCompletion))
),
"second poll fails closed",
"Poll::Ready(Err(TryStreamError::PolledAfterCompletion))",
second
);
crate::assert_with_log!(
polls.load(Ordering::SeqCst) == 1,
"second poll does not touch upstream",
1,
polls.load(Ordering::SeqCst)
);
crate::test_complete!("try_fold_repoll_after_completion_does_not_repoll_stream");
}
#[test]
fn try_collect_repoll_after_completion_does_not_repoll_stream() {
init_test("try_collect_repoll_after_completion_does_not_repoll_stream");
let polls = Arc::new(AtomicUsize::new(0));
let mut future = TryCollect::new(
PollCountingEmptyTryStream::new(polls.clone()),
Vec::<i32>::new(),
);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let first = Pin::new(&mut future).poll(&mut cx);
crate::assert_with_log!(
first == Poll::Ready(Ok::<Vec<i32>, TryStreamError<&'static str>>(Vec::new())),
"first poll completes empty collect",
Poll::Ready(Ok::<Vec<i32>, TryStreamError<&'static str>>(Vec::new())),
first
);
crate::assert_with_log!(
polls.load(Ordering::SeqCst) == 1,
"first poll touches upstream once",
1,
polls.load(Ordering::SeqCst)
);
let second = Pin::new(&mut future).poll(&mut cx);
crate::assert_with_log!(
matches!(
second,
Poll::Ready(Err(TryStreamError::<&'static str>::PolledAfterCompletion))
),
"second poll fails closed",
"Poll::Ready(Err(TryStreamError::PolledAfterCompletion))",
second
);
crate::assert_with_log!(
polls.load(Ordering::SeqCst) == 1,
"second poll does not touch upstream",
1,
polls.load(Ordering::SeqCst)
);
crate::test_complete!("try_collect_repoll_after_completion_does_not_repoll_stream");
}
#[test]
fn try_for_each_success() {
init_test("try_for_each_success");
let mut results = Vec::new();
let mut future = TryForEach::new(iter(vec![1i32, 2, 3]), |x| {
results.push(x);
Ok::<(), &str>(())
});
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut future).poll(&mut cx) {
Poll::Ready(Ok(())) => {
let ok = results == vec![1, 2, 3];
crate::assert_with_log!(ok, "results", vec![1, 2, 3], results);
}
Poll::Ready(Err(_)) => panic!("expected Ok"),
Poll::Pending => panic!("expected Ready"),
}
crate::test_complete!("try_for_each_success");
}
#[test]
fn try_for_each_error() {
init_test("try_for_each_error");
let mut results = Vec::new();
let mut future = TryForEach::new(iter(vec![1i32, 2, 3]), |x| {
if x == 2 {
Err("error at 2")
} else {
results.push(x);
Ok(())
}
});
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut future).poll(&mut cx) {
Poll::Ready(Err(TryStreamError::Inner(e))) => {
let err_ok = e == "error at 2";
crate::assert_with_log!(err_ok, "error", "error at 2", e);
let ok = results == vec![1];
crate::assert_with_log!(ok, "results", vec![1], results);
}
Poll::Ready(Err(TryStreamError::PolledAfterCompletion)) => {
panic!("unexpected PolledAfterCompletion")
}
Poll::Ready(Ok(())) => panic!("expected Err"),
Poll::Pending => panic!("expected Ready"),
}
crate::test_complete!("try_for_each_error");
}
#[test]
fn mr_try_for_each_partition_matches_combined_side_effects() {
init_test("mr_try_for_each_partition_matches_combined_side_effects");
let cases = vec![
Vec::new(),
vec![1],
vec![-2, 0, 4, 9],
(0..19).map(|index| index * 3 - 17).collect(),
];
for values in cases {
let (combined_result, combined_recorded) = for_each_record(values.clone(), None);
crate::assert_with_log!(
combined_result == Ok(()),
"combined for_each succeeds",
Ok::<(), TryStreamError<&'static str>>(()),
combined_result
);
for split in 0..=values.len() {
let (left_result, mut segmented) = for_each_record(values[..split].to_vec(), None);
let (right_result, right_recorded) =
for_each_record(values[split..].to_vec(), None);
segmented.extend(right_recorded);
crate::assert_with_log!(
left_result == Ok(()) && right_result == Ok(()),
"partitioned for_each succeeds",
(
Ok::<(), TryStreamError<&'static str>>(()),
Ok::<(), TryStreamError<&'static str>>(())
),
(&left_result, &right_result)
);
crate::assert_with_log!(
segmented == combined_recorded,
"partitioned side effects match combined order",
combined_recorded.clone(),
segmented
);
}
}
crate::test_complete!("mr_try_for_each_partition_matches_combined_side_effects");
}
#[test]
fn mr_try_for_each_closure_error_is_suffix_invariant() {
init_test("mr_try_for_each_closure_error_is_suffix_invariant");
let suffixes = vec![vec![], vec![4, 5], vec![9, 3, 10]];
for suffix in suffixes {
let mut items = vec![1, 2, 3];
items.extend(suffix);
let (result, recorded) = for_each_record(items, Some(3));
crate::assert_with_log!(
result == Err(TryStreamError::Inner("for_each failure")),
"first closure error wins regardless of suffix",
Err::<(), TryStreamError<&'static str>>(TryStreamError::Inner("for_each failure")),
result
);
crate::assert_with_log!(
recorded == vec![1, 2],
"side effects stop before failing item and suffix",
vec![1, 2],
recorded
);
}
crate::test_complete!("mr_try_for_each_closure_error_is_suffix_invariant");
}
#[test]
fn try_for_each_repoll_after_completion_does_not_repoll_stream() {
init_test("try_for_each_repoll_after_completion_does_not_repoll_stream");
let polls = Arc::new(AtomicUsize::new(0));
let mut future = TryForEach::new(PollCountingEmptyValueStream::new(polls.clone()), |_| {
Ok::<(), &'static str>(())
});
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let first = Pin::new(&mut future).poll(&mut cx);
crate::assert_with_log!(
first == Poll::Ready(Ok::<(), TryStreamError<&'static str>>(())),
"first poll completes empty for_each",
Poll::Ready(Ok::<(), TryStreamError<&'static str>>(())),
first
);
crate::assert_with_log!(
polls.load(Ordering::SeqCst) == 1,
"first poll touches upstream once",
1,
polls.load(Ordering::SeqCst)
);
let second = Pin::new(&mut future).poll(&mut cx);
crate::assert_with_log!(
matches!(
second,
Poll::Ready(Err(TryStreamError::<&'static str>::PolledAfterCompletion))
),
"second poll fails closed",
"Poll::Ready(Err(TryStreamError::PolledAfterCompletion))",
second
);
crate::assert_with_log!(
polls.load(Ordering::SeqCst) == 1,
"second poll does not touch upstream",
1,
polls.load(Ordering::SeqCst)
);
crate::test_complete!("try_for_each_repoll_after_completion_does_not_repoll_stream");
}
#[test]
fn try_collect_yields_after_budget_on_always_ready_success_stream() {
init_test("try_collect_yields_after_budget_on_always_ready_success_stream");
let mut future = TryCollect::new(
AlwaysReadyOkStream::new(TRY_STREAM_COOPERATIVE_BUDGET + 5),
Vec::new(),
);
let woke = Arc::new(AtomicBool::new(false));
let waker = Waker::from(Arc::new(TrackWaker(woke.clone())));
let mut cx = Context::from_waker(&waker);
let first = Pin::new(&mut future).poll(&mut cx);
crate::assert_with_log!(
matches!(first, Poll::Pending),
"first poll yields cooperatively",
"Poll::Pending",
first
);
crate::assert_with_log!(
future.collection.len() == TRY_STREAM_COOPERATIVE_BUDGET,
"collection preserved across yield",
TRY_STREAM_COOPERATIVE_BUDGET,
future.collection.len()
);
crate::assert_with_log!(
future.stream.next == TRY_STREAM_COOPERATIVE_BUDGET,
"upstream advanced only to budget",
TRY_STREAM_COOPERATIVE_BUDGET,
future.stream.next
);
crate::assert_with_log!(
woke.load(Ordering::SeqCst),
"self-wake requested",
true,
woke.load(Ordering::SeqCst)
);
let second = Pin::new(&mut future).poll(&mut cx);
crate::assert_with_log!(
second
== Poll::Ready(Ok::<Vec<usize>, TryStreamError<&'static str>>(
(0..TRY_STREAM_COOPERATIVE_BUDGET + 5).collect(),
)),
"second poll completes collection",
Poll::Ready(Ok::<Vec<usize>, TryStreamError<&'static str>>(
(0..TRY_STREAM_COOPERATIVE_BUDGET + 5).collect::<Vec<_>>()
)),
second
);
crate::test_complete!("try_collect_yields_after_budget_on_always_ready_success_stream");
}
#[test]
fn try_fold_yields_after_budget_on_always_ready_success_stream() {
init_test("try_fold_yields_after_budget_on_always_ready_success_stream");
let mut future = TryFold::new(
AlwaysReadyOkStream::new(TRY_STREAM_COOPERATIVE_BUDGET + 5),
0usize,
|acc, x| Ok::<usize, &'static str>(acc + x),
);
let woke = Arc::new(AtomicBool::new(false));
let waker = Waker::from(Arc::new(TrackWaker(woke.clone())));
let mut cx = Context::from_waker(&waker);
let first = Pin::new(&mut future).poll(&mut cx);
crate::assert_with_log!(
matches!(first, Poll::Pending),
"first poll yields cooperatively",
"Poll::Pending",
first
);
crate::assert_with_log!(
future.acc == Some((0..TRY_STREAM_COOPERATIVE_BUDGET).sum()),
"accumulator preserved across yield",
Some((0..TRY_STREAM_COOPERATIVE_BUDGET).sum::<usize>()),
future.acc
);
crate::assert_with_log!(
future.stream.next == TRY_STREAM_COOPERATIVE_BUDGET,
"upstream advanced only to budget",
TRY_STREAM_COOPERATIVE_BUDGET,
future.stream.next
);
crate::assert_with_log!(
woke.load(Ordering::SeqCst),
"self-wake requested",
true,
woke.load(Ordering::SeqCst)
);
let second = Pin::new(&mut future).poll(&mut cx);
crate::assert_with_log!(
second
== Poll::Ready(Ok::<usize, TryStreamError<&'static str>>(
(0..TRY_STREAM_COOPERATIVE_BUDGET + 5).sum(),
)),
"second poll completes fold",
Poll::Ready(Ok::<usize, TryStreamError<&'static str>>(
(0..TRY_STREAM_COOPERATIVE_BUDGET + 5).sum::<usize>()
)),
second
);
crate::test_complete!("try_fold_yields_after_budget_on_always_ready_success_stream");
}
#[test]
fn try_for_each_yields_after_budget_on_always_ready_success_stream() {
use std::cell::RefCell;
use std::rc::Rc;
init_test("try_for_each_yields_after_budget_on_always_ready_success_stream");
let results = Rc::new(RefCell::new(Vec::new()));
let results_clone = results.clone();
let mut future = TryForEach::new(
AlwaysReadyValueStream::new(TRY_STREAM_COOPERATIVE_BUDGET + 5),
move |x| {
results_clone.borrow_mut().push(x);
Ok::<(), &'static str>(())
},
);
let woke = Arc::new(AtomicBool::new(false));
let waker = Waker::from(Arc::new(TrackWaker(woke.clone())));
let mut cx = Context::from_waker(&waker);
let first = Pin::new(&mut future).poll(&mut cx);
crate::assert_with_log!(
matches!(first, Poll::Pending),
"first poll yields cooperatively",
"Poll::Pending",
first
);
crate::assert_with_log!(
results.borrow().len() == TRY_STREAM_COOPERATIVE_BUDGET,
"side effects preserved across yield",
TRY_STREAM_COOPERATIVE_BUDGET,
results.borrow().len()
);
crate::assert_with_log!(
future.stream.next == TRY_STREAM_COOPERATIVE_BUDGET,
"upstream advanced only to budget",
TRY_STREAM_COOPERATIVE_BUDGET,
future.stream.next
);
crate::assert_with_log!(
woke.load(Ordering::SeqCst),
"self-wake requested",
true,
woke.load(Ordering::SeqCst)
);
let second = Pin::new(&mut future).poll(&mut cx);
crate::assert_with_log!(
second == Poll::Ready(Ok::<(), TryStreamError<&'static str>>(())),
"second poll completes for_each",
Poll::Ready(Ok::<(), TryStreamError<&'static str>>(())),
second
);
crate::assert_with_log!(
*results.borrow() == (0..TRY_STREAM_COOPERATIVE_BUDGET + 5).collect::<Vec<_>>(),
"all side effects observed after completion",
(0..TRY_STREAM_COOPERATIVE_BUDGET + 5).collect::<Vec<_>>(),
*results.borrow()
);
crate::test_complete!("try_for_each_yields_after_budget_on_always_ready_success_stream");
}
}