1use crate::RS2Stream;
2use futures_util::StreamExt;
3use std::sync::Arc;
4use async_stream::stream;
5
6pub struct Pipe<I, O> {
9 f: Arc<dyn Fn(RS2Stream<I>) -> RS2Stream<O> + Send + Sync + 'static>,
10}
11
12impl<I, O> Clone for Pipe<I, O> {
13 fn clone(&self) -> Self {
14 Pipe {
15 f: Arc::clone(&self.f),
16 }
17 }
18}
19
20impl<I, O> Pipe<I, O> {
21 pub fn new<F>(f: F) -> Self
23 where
24 F: Fn(RS2Stream<I>) -> RS2Stream<O> + Send + Sync + 'static,
25 {
26 Pipe {
27 f: Arc::new(f),
28 }
29 }
30
31 pub fn apply(&self, input: RS2Stream<I>) -> RS2Stream<O> {
33 (self.f)(input)
34 }
35}
36
37pub fn map<I, O, F>(f: F) -> Pipe<I, O>
39where
40 F: Fn(I) -> O + Send + Sync + Clone + 'static,
41 I: Send + 'static,
42 O: Send + 'static,
43{
44 Pipe::new(move |input| {
45 let f = f.clone();
46 input.map(move |i| f(i)).boxed()
47 })
48}
49
50pub fn filter<I, F>(predicate: F) -> Pipe<I, I>
52where
53 F: Fn(&I) -> bool + Send + Sync + Clone + 'static,
54 I: Send + 'static,
55{
56 Pipe::new(move |input| {
57 let predicate = predicate.clone();
58 stream! {
59 let mut s = input;
60 while let Some(item) = s.next().await {
61 if predicate(&item) {
62 yield item;
63 }
64 }
65 }.boxed()
66 })
67}
68
69pub fn compose<I, M, O>(p1: Pipe<I, M>, p2: Pipe<M, O>) -> Pipe<I, O>
71where
72 I: Send + 'static,
73 M: Send + 'static,
74 O: Send + 'static,
75{
76 Pipe::new(move |input| {
77 let p1 = p1.clone();
78 let p2 = p2.clone();
79 p2.apply(p1.apply(input))
80 })
81}
82
83pub fn identity<I>() -> Pipe<I, I>
85where
86 I: Send + 'static,
87{
88 Pipe::new(|input| input)
89}
90
91pub trait PipeExt<I, O> {
93 fn compose<P>(self, other: Pipe<O, P>) -> Pipe<I, P>
95 where
96 P: Send + 'static;
97}
98
99impl<I, O> PipeExt<I, O> for Pipe<I, O>
100where
101 I: Send + 'static,
102 O: Send + 'static,
103{
104 fn compose<P>(self, other: Pipe<O, P>) -> Pipe<I, P>
105 where
106 P: Send + 'static,
107 {
108 compose(self, other)
109 }
110}