use crate::stream::{Stream, CloneableStreamable}; use std::marker::PhantomData;
use std::sync::Arc;
pub struct Pipe<In, Out>
where
In: CloneableStreamable, Out: CloneableStreamable, {
func: Arc<dyn Fn(Stream<In>) -> Stream<Out> + Send + Sync + 'static>,
_phantom_in: PhantomData<In>,
_phantom_out: PhantomData<Out>,
}
impl<In, Out> Pipe<In, Out>
where
In: CloneableStreamable,
Out: CloneableStreamable,
{
pub fn new<F>(f: F) -> Self
where
F: Fn(Stream<In>) -> Stream<Out> + Send + Sync + 'static,
{
Pipe {
func: Arc::new(f),
_phantom_in: PhantomData,
_phantom_out: PhantomData,
}
}
pub fn apply(&self, stream: Stream<In>) -> Stream<Out> {
(self.func)(stream)
}
pub fn and_then<NextOut>(self, next_pipe: Pipe<Out, NextOut>) -> Pipe<In, NextOut>
where
NextOut: CloneableStreamable,
{
let first_pipe_func = self.func;
let second_pipe_func = next_pipe.func;
Pipe::new(move |initial_stream: Stream<In>| {
let intermediate_stream = first_pipe_func(initial_stream);
second_pipe_func(intermediate_stream)
})
}
pub fn compose<PrevIn>(self, prev_pipe: Pipe<PrevIn, In>) -> Pipe<PrevIn, Out>
where
PrevIn: CloneableStreamable,
{
prev_pipe.and_then(self)
}
pub fn identity() -> Pipe<In, In>
where
In: CloneableStreamable,
{
Pipe::new(|stream: Stream<In>| stream)
}
}
impl<In, Out> Clone for Pipe<In, Out>
where
In: CloneableStreamable,
Out: CloneableStreamable,
{
fn clone(&self) -> Self {
Pipe {
func: Arc::clone(&self.func),
_phantom_in: PhantomData,
_phantom_out: PhantomData,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use actix_rt;
#[actix_rt::test]
async fn test_pipe_identity() {
let identity_pipe: Pipe<i32, i32> = Pipe::<i32, i32>::identity();
let stream_in = Stream::emits(vec![1, 2, 3]);
let stream_out = identity_pipe.apply(stream_in);
let result = stream_out.compile_to_list().await;
assert_eq!(result, Ok(vec![1, 2, 3]));
}
#[actix_rt::test]
async fn test_pipe_map_and_filter() {
let map_pipe: Pipe<i32, i32> = Pipe::new(|s: Stream<i32>| s.map(|x| x * 2)); let filter_pipe: Pipe<i32, i32> = Pipe::new(|s: Stream<i32>| s.filter(|x| *x > 5));
let composed_pipe = map_pipe.and_then(filter_pipe);
let stream_in = Stream::emits(vec![1, 2, 3, 4, 5]);
let stream_out = composed_pipe.apply(stream_in);
let result = stream_out.compile_to_list().await;
assert_eq!(result, Ok(vec![6, 8, 10]));
}
#[actix_rt::test]
async fn test_pipe_compose() {
let map_pipe: Pipe<i32, i32> = Pipe::new(|s: Stream<i32>| s.map(|x| x * 2));
let filter_pipe: Pipe<i32, i32> = Pipe::new(|s: Stream<i32>| s.filter(|x| *x > 5));
let composed_pipe = filter_pipe.compose(map_pipe);
let stream_in = Stream::emits(vec![1, 2, 3, 4, 5]);
let stream_out = composed_pipe.apply(stream_in);
let result = stream_out.compile_to_list().await;
assert_eq!(result, Ok(vec![6, 8, 10]));
}
#[actix_rt::test]
async fn test_complex_pipe_chain() {
let to_string_pipe: Pipe<i32, String> = Pipe::new(|s: Stream<i32>| s.map(|x| format!("Item: {}", x)));
let take_pipe: Pipe<String, String> = Pipe::new(|s: Stream<String>| s.take(2));
let filter_even_numbers_as_i32: Pipe<i32, i32> = Pipe::new(|s: Stream<i32>| s.filter(|x| x % 2 == 0));
let complex_pipe: Pipe<i32, String> = filter_even_numbers_as_i32
.and_then(to_string_pipe)
.and_then(take_pipe);
let stream_in = Stream::emits(vec![1, 2, 3, 4, 5, 6]);
let stream_out = complex_pipe.apply(stream_in);
let result = stream_out.compile_to_list().await;
assert_eq!(result, Ok(vec!["Item: 2".to_string(), "Item: 4".to_string()]));
}
}