use crate::RS2Stream;
use async_stream::stream;
use futures_util::StreamExt;
use std::sync::Arc;
pub struct Pipe<I, O> {
f: Arc<dyn Fn(RS2Stream<I>) -> RS2Stream<O> + Send + Sync + 'static>,
}
impl<I, O> Clone for Pipe<I, O> {
fn clone(&self) -> Self {
Pipe {
f: Arc::clone(&self.f),
}
}
}
impl<I, O> Pipe<I, O> {
pub fn new<F>(f: F) -> Self
where
F: Fn(RS2Stream<I>) -> RS2Stream<O> + Send + Sync + 'static,
{
Pipe { f: Arc::new(f) }
}
pub fn apply(&self, input: RS2Stream<I>) -> RS2Stream<O> {
(self.f)(input)
}
}
pub fn map<I, O, F>(f: F) -> Pipe<I, O>
where
F: Fn(I) -> O + Send + Sync + Clone + 'static,
I: Send + 'static,
O: Send + 'static,
{
Pipe::new(move |input| {
let f = f.clone();
input.map(move |i| f(i)).boxed()
})
}
pub fn filter<I, F>(predicate: F) -> Pipe<I, I>
where
F: Fn(&I) -> bool + Send + Sync + Clone + 'static,
I: Send + 'static,
{
Pipe::new(move |input| {
let predicate = predicate.clone();
stream! {
let mut s = input;
while let Some(item) = s.next().await {
if predicate(&item) {
yield item;
}
}
}
.boxed()
})
}
pub fn compose<I, M, O>(p1: Pipe<I, M>, p2: Pipe<M, O>) -> Pipe<I, O>
where
I: Send + 'static,
M: Send + 'static,
O: Send + 'static,
{
Pipe::new(move |input| {
let p1 = p1.clone();
let p2 = p2.clone();
p2.apply(p1.apply(input))
})
}
pub fn identity<I>() -> Pipe<I, I>
where
I: Send + 'static,
{
Pipe::new(|input| input)
}
pub trait PipeExt<I, O> {
fn compose<P>(self, other: Pipe<O, P>) -> Pipe<I, P>
where
P: Send + 'static;
}
impl<I, O> PipeExt<I, O> for Pipe<I, O>
where
I: Send + 'static,
O: Send + 'static,
{
fn compose<P>(self, other: Pipe<O, P>) -> Pipe<I, P>
where
P: Send + 'static,
{
compose(self, other)
}
}