1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use crate::link::{Error, ErrorFuc, Linkable, Pipeline};
use crate::Start;
pub struct Connect<P, F> {
pub(crate) prev: P,
pub(crate) next: F,
}
impl<NXT, F, L> Linkable for Connect<L, F>
where
F: Fn(L::OUT) -> NXT + Send + Sync,
NXT: Send + Sync,
L: Linkable + Send + Sync,
{
type OUT = NXT;
}
#[async_trait::async_trait]
impl<NXT, F, P> Pipeline for Connect<P, F>
where
P: Pipeline + Sync + Send,
F: Fn(P::OUT) -> NXT + Sync + Send,
NXT: Send + Sync,
{
type IN = P::IN;
async fn process(self: &Self, input: Self::IN) -> Result<NXT, Error> {
let out = self.prev.process(input).await?;
Ok((self.next)(out))
}
}
impl<NXT, F, P> Linkable for Connect<P, ErrorFuc<F>>
where
F: Fn(P::OUT) -> Result<NXT, Error> + Send + Sync,
NXT: Send + Sync,
P: Linkable + Send + Sync,
{
type OUT = NXT;
}
#[async_trait::async_trait]
impl<NXT, F, P> Pipeline for Connect<P, ErrorFuc<F>>
where
P: Pipeline + Sync + Send,
F: Fn(P::OUT) -> Result<NXT, Error> + Sync + Send,
NXT: Send + Sync,
{
type IN = P::IN;
async fn process(self: &Self, input: Self::IN) -> Result<NXT, Error> {
let out = self.prev.process(input).await?;
(self.next.f)(out)
}
}
#[async_trait::async_trait]
impl<NXT, F, IN> Pipeline for Connect<Start<IN>, ErrorFuc<F>>
where
IN: Sync + Send,
F: Fn(IN) -> Result<NXT, Error> + Sync + Send,
NXT: Send + Sync,
{
type IN = IN;
async fn process(self: &Self, input: Self::IN) -> Result<NXT, Error> {
(self.next.f)(input)
}
}
#[async_trait::async_trait]
impl<NXT, F, IN> Pipeline for Connect<Start<IN>, F>
where
F: Fn(IN) -> NXT + Send + Sync,
NXT: Send + Sync,
IN: Send + Sync,
{
type IN = IN;
async fn process(self: &Self, input: IN) -> Result<NXT, Error> {
Ok((self.next)(input))
}
}