rs2_stream/
pipe.rs

1use crate::RS2Stream;
2use futures_util::StreamExt;
3use std::sync::Arc;
4use async_stream::stream;
5
6/// A Pipe represents a rs2_stream transformation from one type to another.
7/// It's a function from Stream[I] to Stream[O].
8pub struct Pipe<I, O> {
9    f: Arc<dyn Fn(RS2Stream<I>) -> RS2Stream<O> + Send + Sync + 'static>,
10}
11
12impl<I, O> Clone for Pipe<I, O> {
13    fn clone(&self) -> Self {
14        Pipe {
15            f: Arc::clone(&self.f),
16        }
17    }
18}
19
20impl<I, O> Pipe<I, O> {
21    /// Create a new pipe from a function
22    pub fn new<F>(f: F) -> Self
23    where
24        F: Fn(RS2Stream<I>) -> RS2Stream<O> + Send + Sync + 'static,
25    {
26        Pipe {
27            f: Arc::new(f),
28        }
29    }
30
31    /// Apply this pipe to a rs2_stream
32    pub fn apply(&self, input: RS2Stream<I>) -> RS2Stream<O> {
33        (self.f)(input)
34    }
35}
36
37/// Create a pipe that applies the given function to each element
38pub fn map<I, O, F>(f: F) -> Pipe<I, O>
39where
40    F: Fn(I) -> O + Send + Sync + Clone + 'static,
41    I: Send + 'static,
42    O: Send + 'static,
43{
44    Pipe::new(move |input| {
45        let f = f.clone();
46        input.map(move |i| f(i)).boxed()
47    })
48}
49
50/// Create a pipe that filters elements based on the predicate
51pub fn filter<I, F>(predicate: F) -> Pipe<I, I>
52where
53    F: Fn(&I) -> bool + Send + Sync + Clone + 'static,
54    I: Send + 'static,
55{
56    Pipe::new(move |input| {
57        let predicate = predicate.clone();
58        stream! {
59            let mut s = input;
60            while let Some(item) = s.next().await {
61                if predicate(&item) {
62                    yield item;
63                }
64            }
65        }.boxed()
66    })
67}
68
69/// Compose two pipes together
70pub fn compose<I, M, O>(p1: Pipe<I, M>, p2: Pipe<M, O>) -> Pipe<I, O>
71where
72    I: Send + 'static,
73    M: Send + 'static,
74    O: Send + 'static,
75{
76    Pipe::new(move |input| {
77        let p1 = p1.clone();
78        let p2 = p2.clone();
79        p2.apply(p1.apply(input))
80    })
81}
82
83/// Identity pipe that doesn't transform the rs2_stream
84pub fn identity<I>() -> Pipe<I, I>
85where
86    I: Send + 'static,
87{
88    Pipe::new(|input| input)
89}
90
91/// Extension trait for pipes
92pub trait PipeExt<I, O> {
93    /// Compose this pipe with another pipe
94    fn compose<P>(self, other: Pipe<O, P>) -> Pipe<I, P>
95    where
96        P: Send + 'static;
97}
98
99impl<I, O> PipeExt<I, O> for Pipe<I, O>
100where
101    I: Send + 'static,
102    O: Send + 'static,
103{
104    fn compose<P>(self, other: Pipe<O, P>) -> Pipe<I, P>
105    where
106        P: Send + 'static,
107    {
108        compose(self, other)
109    }
110}