async_pipeline_for_lucas/
connect.rs1use crate::link::{Error, ErrorFuc, Linkable, Pipeline};
2use crate::Start;
3
4pub struct Connect<P: ?Sized, F> {
5 pub(crate) prev: Box<P>,
6 pub(crate) next: F,
7}
8
9impl<NXT, F, L> Linkable for Connect<L, F>
10where
11 F: Fn(L::OUT) -> NXT + Send + Sync,
12 NXT: Send + Sync,
13 L: Linkable + Send + Sync,
14{
15 type OUT = NXT;
16}
17
18#[async_trait::async_trait]
19impl<NXT, F, P> Pipeline for Connect<P, F>
20where
21 P: Pipeline + Sync + Send,
22 F: Fn(P::OUT) -> NXT + Sync + Send,
23 NXT: Send + Sync,
24{
25 type IN = P::IN;
26 async fn process(self: &Self, input: Self::IN) -> Result<NXT, Error> {
27 let out = self.prev.process(input).await?;
28 Ok((self.next)(out))
29 }
30}
31
32impl<NXT, F, P> Linkable for Connect<P, ErrorFuc<F>>
33where
34 F: Fn(P::OUT) -> Result<NXT, Error> + Send + Sync,
35 NXT: Send + Sync,
36 P: Linkable + Send + Sync,
37{
38 type OUT = NXT;
39}
40
41#[async_trait::async_trait]
42impl<NXT, F, P> Pipeline for Connect<P, ErrorFuc<F>>
43where
44 P: Pipeline + Sync + Send,
45 F: Fn(P::OUT) -> Result<NXT, Error> + Sync + Send,
46 NXT: Send + Sync,
47{
48 type IN = P::IN;
49 async fn process(self: &Self, input: Self::IN) -> Result<NXT, Error> {
50 let out = self.prev.process(input).await?;
51 (self.next.f)(out)
52 }
53}
54
55#[async_trait::async_trait]
56impl<NXT, F, IN> Pipeline for Connect<Start<IN>, ErrorFuc<F>>
57where
58 IN: Sync + Send,
59 F: Fn(IN) -> Result<NXT, Error> + Sync + Send,
60 NXT: Send + Sync,
61{
62 type IN = IN;
63 async fn process(self: &Self, input: Self::IN) -> Result<NXT, Error> {
64 (self.next.f)(input)
65 }
66}
67
68#[async_trait::async_trait]
69impl<NXT, F, IN> Pipeline for Connect<Start<IN>, F>
70where
71 F: Fn(IN) -> NXT + Send + Sync,
72 NXT: Send + Sync,
73 IN: Send + Sync,
74{
75 type IN = IN;
76 async fn process(self: &Self, input: IN) -> Result<NXT, Error> {
77 Ok((self.next)(input))
78 }
79}