1use crate::RS2Stream;
2use async_stream::stream;
3use futures_util::StreamExt;
4use std::sync::Arc;
5
6pub struct Pipe<I, O> {
9 f: Arc<dyn Fn(RS2Stream<I>) -> RS2Stream<O> + Send + Sync + 'static>,
10}
11
12impl<I, O> Clone for Pipe<I, O> {
13 fn clone(&self) -> Self {
14 Pipe {
15 f: Arc::clone(&self.f),
16 }
17 }
18}
19
20impl<I, O> Pipe<I, O> {
21 pub fn new<F>(f: F) -> Self
23 where
24 F: Fn(RS2Stream<I>) -> RS2Stream<O> + Send + Sync + 'static,
25 {
26 Pipe { f: Arc::new(f) }
27 }
28
29 pub fn apply(&self, input: RS2Stream<I>) -> RS2Stream<O> {
31 (self.f)(input)
32 }
33}
34
35pub fn map<I, O, F>(f: F) -> Pipe<I, O>
37where
38 F: Fn(I) -> O + Send + Sync + Clone + 'static,
39 I: Send + 'static,
40 O: Send + 'static,
41{
42 Pipe::new(move |input| {
43 let f = f.clone();
44 input.map(move |i| f(i)).boxed()
45 })
46}
47
48pub fn filter<I, F>(predicate: F) -> Pipe<I, I>
50where
51 F: Fn(&I) -> bool + Send + Sync + Clone + 'static,
52 I: Send + 'static,
53{
54 Pipe::new(move |input| {
55 let predicate = predicate.clone();
56 stream! {
57 let mut s = input;
58 while let Some(item) = s.next().await {
59 if predicate(&item) {
60 yield item;
61 }
62 }
63 }
64 .boxed()
65 })
66}
67
68pub fn compose<I, M, O>(p1: Pipe<I, M>, p2: Pipe<M, O>) -> Pipe<I, O>
70where
71 I: Send + 'static,
72 M: Send + 'static,
73 O: Send + 'static,
74{
75 Pipe::new(move |input| {
76 let p1 = p1.clone();
77 let p2 = p2.clone();
78 p2.apply(p1.apply(input))
79 })
80}
81
82pub fn identity<I>() -> Pipe<I, I>
84where
85 I: Send + 'static,
86{
87 Pipe::new(|input| input)
88}
89
90pub trait PipeExt<I, O> {
92 fn compose<P>(self, other: Pipe<O, P>) -> Pipe<I, P>
94 where
95 P: Send + 'static;
96}
97
98impl<I, O> PipeExt<I, O> for Pipe<I, O>
99where
100 I: Send + 'static,
101 O: Send + 'static,
102{
103 fn compose<P>(self, other: Pipe<O, P>) -> Pipe<I, P>
104 where
105 P: Send + 'static,
106 {
107 compose(self, other)
108 }
109}