async_pipeline/
async_connect.rs1use crate::link::{Error, ErrorFuc, Linkable, Pipeline};
2use crate::Start;
3use std::future::Future;
4
5pub struct AsyncConnect<P, F> {
6 pub(crate) prev: P,
7 pub(crate) next: F,
8}
9
10impl<NXT, Fut, F, L> Linkable for AsyncConnect<L, F>
11where
12 F: 'static + Fn(L::OUT) -> Fut + Send + Sync,
13 Fut: 'static + Future<Output = NXT> + Send + Sync,
14 NXT: Send + Sync,
15 L: Linkable + Send + Sync,
16{
17 type OUT = NXT;
18}
19
20#[async_trait::async_trait]
21impl<NXT, Fut, F, P> Pipeline for AsyncConnect<P, F>
22where
23 P: Pipeline + Sync + Send,
24 F: Fn(P::OUT) -> Fut + 'static + Send + Sync,
25 Fut: Future<Output = NXT> + Send + Sync + 'static,
26 NXT: Send + Sync,
27{
28 type IN = P::IN;
29 async fn process(self: &Self, input: Self::IN) -> Result<NXT, Error> {
30 let out = self.prev.process(input).await?;
31 Ok((self.next)(out).await)
32 }
33}
34
35#[async_trait::async_trait]
36impl<NXT, Fut, F, IN> Pipeline for AsyncConnect<Start<IN>, F>
37where
38 F: Fn(IN) -> Fut + 'static + Send + Sync,
39 Fut: Future<Output = NXT> + Send + Sync + 'static,
40 NXT: Send + Sync,
41 IN: Send + Sync,
42{
43 type IN = IN;
44 async fn process(self: &Self, input: IN) -> Result<NXT, Error> {
45 Ok((self.next)(input).await)
46 }
47}
48
49impl<NXT, Fut, F, L> Linkable for AsyncConnect<L, ErrorFuc<F>>
50where
51 F: Fn(L::OUT) -> Fut + Send + Sync,
52 Fut: Future<Output = Result<NXT, Error>> + Send + Sync + 'static,
53 NXT: Send + Sync,
54 L: Linkable + Send + Sync,
55{
56 type OUT = NXT;
57}
58
59#[async_trait::async_trait]
60impl<NXT, Fut, F, P> Pipeline for AsyncConnect<P, ErrorFuc<F>>
61where
62 F: Fn(P::OUT) -> Fut + Send + Sync,
63 Fut: Future<Output = Result<NXT, Error>> + Send + Sync + 'static,
64 NXT: Send + Sync,
65 P: Pipeline + Send + Sync,
66{
67 type IN = P::IN;
68 async fn process(self: &Self, input: Self::IN) -> Result<NXT, Error> {
69 let out = self.prev.process(input).await?;
70 (self.next.f)(out).await
71 }
72}
73
74#[async_trait::async_trait]
75impl<NXT, Fut, F, IN> Pipeline for AsyncConnect<Start<IN>, ErrorFuc<F>>
76where
77 IN: Sync + Send,
78 F: Fn(IN) -> Fut + Sync + Send,
79 Fut: Future<Output = Result<NXT, Error>> + Send + Sync + 'static,
80 NXT: Send + Sync,
81{
82 type IN = IN;
83 async fn process(self: &Self, input: Self::IN) -> Result<NXT, Error> {
84 (self.next.f)(input).await
85 }
86}