use crate::AsyncStream;
use core::future::Future;
use std::vec::Vec;
use sugars_async_task::{AsyncTask, NotResult};
use sugars_collections::ZeroOneOrMany;
type Error = Box<dyn std::error::Error + Send + Sync>;
pub trait StreamExt<T>: Sized + 'static {
fn on_result<F>(self, f: F) -> AsyncStream<T>
where
F: FnMut(Result<T, Error>) -> Result<T, Error> + Send + 'static,
T: NotResult;
fn on_chunk<F, U>(self, f: F) -> AsyncStream<U>
where
F: FnMut(Result<T, Error>) -> U + Send + 'static,
U: Send + 'static + NotResult;
fn on_error<F>(self, f: F) -> AsyncStream<T>
where
F: FnMut(Error) + Send + 'static,
T: NotResult;
fn tap_each(self, f: impl FnMut(&T) + Send + 'static) -> AsyncStream<T>
where
T: NotResult;
fn tee_each(self, f: impl FnMut(T) + Send + 'static) -> AsyncStream<T>
where
T: NotResult;
fn map_stream<U: Send + 'static + NotResult>(
self,
f: impl FnMut(T) -> U + Send + 'static,
) -> AsyncStream<U>;
fn filter_stream(self, f: impl FnMut(&T) -> bool + Send + 'static) -> AsyncStream<T>
where
T: NotResult;
fn partition_chunks(self, chunk_size: usize) -> AsyncStream<Vec<T>>
where
Vec<T>: NotResult;
fn collect(self) -> AsyncTask<Vec<T>>
where
Vec<T>: NotResult;
fn await_result<F, Fut>(self, f: F) -> AsyncTask<()>
where
F: FnMut(T) -> Fut + Send + 'static,
Fut: Future<Output = Result<(), Error>> + Send + 'static;
fn await_ok<F, Fut>(self, f: F) -> AsyncTask<()>
where
F: FnMut(T) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static;
}
impl<T: Clone + Send + 'static + NotResult> StreamExt<T> for AsyncStream<T> {
fn on_result<F>(self, mut f: F) -> AsyncStream<T>
where
F: FnMut(Result<T, Error>) -> Result<T, Error> + Send + 'static,
{
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(async move {
use futures::StreamExt;
let mut stream = self;
while let Some(item) = stream.next().await {
match f(Ok(item)) {
Ok(v) => {
if tx.send(v).is_err() {
break;
}
}
Err(_) => break,
}
}
});
AsyncStream::new(rx)
}
fn on_chunk<F, U>(self, mut f: F) -> AsyncStream<U>
where
Self: Send + 'static,
F: FnMut(Result<T, Error>) -> U + Send + 'static,
U: Send + 'static + NotResult,
{
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(async move {
use futures::StreamExt;
let mut stream = self;
while let Some(item) = stream.next().await {
let result = f(Ok(item));
if tx.send(result).is_err() {
break;
}
}
});
AsyncStream::new(rx)
}
fn on_error<F>(self, _f: F) -> AsyncStream<T>
where
F: FnMut(Error) + Send + 'static,
{
self
}
fn tap_each(self, mut f: impl FnMut(&T) + Send + 'static) -> AsyncStream<T> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(async move {
use futures::StreamExt;
let mut stream = self;
while let Some(item) = stream.next().await {
f(&item);
if tx.send(item).is_err() {
break;
}
}
});
AsyncStream::new(rx)
}
fn tee_each(self, mut f: impl FnMut(T) + Send + 'static) -> AsyncStream<T> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(async move {
use futures::StreamExt;
let mut stream = self;
while let Some(item) = stream.next().await {
f(item.clone());
if tx.send(item).is_err() {
break;
}
}
});
AsyncStream::new(rx)
}
fn map_stream<U: Send + 'static + NotResult>(
self,
mut f: impl FnMut(T) -> U + Send + 'static,
) -> AsyncStream<U> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(async move {
use futures::StreamExt;
let mut stream = self;
while let Some(item) = stream.next().await {
if tx.send(f(item)).is_err() {
break;
}
}
});
AsyncStream::new(rx)
}
fn filter_stream(self, mut f: impl FnMut(&T) -> bool + Send + 'static) -> AsyncStream<T> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(async move {
use futures::StreamExt;
let mut stream = self;
while let Some(item) = stream.next().await {
if f(&item) && tx.send(item).is_err() {
break;
}
}
});
AsyncStream::new(rx)
}
fn partition_chunks(self, chunk_size: usize) -> AsyncStream<Vec<T>>
where
Vec<T>: NotResult,
{
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(async move {
use futures::StreamExt;
let mut stream = self;
let mut buffer = Vec::with_capacity(chunk_size);
while let Some(item) = stream.next().await {
buffer.push(item);
if buffer.len() >= chunk_size {
let chunk = std::mem::replace(&mut buffer, Vec::with_capacity(chunk_size));
if tx.send(chunk).is_err() {
break;
}
}
}
if !buffer.is_empty() {
let _ = tx.send(buffer);
}
});
AsyncStream::new(rx)
}
fn collect(self) -> AsyncTask<Vec<T>> {
self.collect_async()
}
fn await_result<F, Fut>(self, mut _f: F) -> AsyncTask<()>
where
F: FnMut(T) -> Fut + Send + 'static,
Fut: Future<Output = Result<(), Error>> + Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = tx.send(());
AsyncTask::new(ZeroOneOrMany::one(rx))
}
fn await_ok<F, Fut>(self, mut _f: F) -> AsyncTask<()>
where
F: FnMut(T) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = tx.send(());
AsyncTask::new(ZeroOneOrMany::one(rx))
}
}