Skip to main content

co_primitives/library/
co_try_stream_ext.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use futures::{pin_mut, Stream, TryStreamExt};
5use std::{marker::PhantomData, task::Poll};
6
7#[async_trait::async_trait]
8pub trait CoTryStreamExt: Stream<Item = Result<Self::Ok, Self::Error>> {
9	type Ok;
10	type Error;
11
12	async fn try_first(self) -> Result<Option<Self::Ok>, Self::Error>
13	where
14		Self: Sized,
15	{
16		Ok(try_first(self).await?)
17	}
18
19	/// Ignore all elements by only forwarding errors.
20	fn try_ignore_elements<T>(self) -> TryIgnoreElements<Self, T>
21	where
22		Self: Sized,
23	{
24		TryIgnoreElements { inner: self, _out: PhantomData }
25	}
26}
27impl<S, T, E> CoTryStreamExt for S
28where
29	S: ?Sized + Stream<Item = Result<T, E>>,
30{
31	type Ok = T;
32	type Error = E;
33}
34
35async fn try_first<T, E, S>(stream: S) -> Result<Option<T>, E>
36where
37	S: Stream<Item = Result<T, E>> + Sized,
38{
39	pin_mut!(stream);
40	stream.try_next().await
41}
42
43#[pin_project::pin_project]
44pub struct TryIgnoreElements<S, O> {
45	#[pin]
46	inner: S,
47	_out: PhantomData<O>,
48}
49impl<S, T, E, O> Stream for TryIgnoreElements<S, O>
50where
51	S: Stream<Item = Result<T, E>>,
52{
53	type Item = Result<O, E>;
54
55	fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
56		let mut this = self.project();
57		match this.inner.as_mut().poll_next(cx) {
58			// ignore elements
59			Poll::Ready(Some(Ok(_))) => Poll::Pending,
60			// forward error
61			Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
62			// forward comlete
63			Poll::Ready(None) => Poll::Ready(None),
64			// forward pending
65			Poll::Pending => Poll::Pending,
66		}
67	}
68}