async_pipeline/
async_connect.rs

1use crate::link::{Error, ErrorFuc, Linkable, Pipeline};
2use crate::Start;
3use std::future::Future;
4
5pub struct AsyncConnect<P, F> {
6    pub(crate) prev: P,
7    pub(crate) next: F,
8}
9
10impl<NXT, Fut, F, L> Linkable for AsyncConnect<L, F>
11where
12    F: 'static + Fn(L::OUT) -> Fut + Send + Sync,
13    Fut: 'static + Future<Output = NXT> + Send + Sync,
14    NXT: Send + Sync,
15    L: Linkable + Send + Sync,
16{
17    type OUT = NXT;
18}
19
20#[async_trait::async_trait]
21impl<NXT, Fut, F, P> Pipeline for AsyncConnect<P, F>
22where
23    P: Pipeline + Sync + Send,
24    F: Fn(P::OUT) -> Fut + 'static + Send + Sync,
25    Fut: Future<Output = NXT> + Send + Sync + 'static,
26    NXT: Send + Sync,
27{
28    type IN = P::IN;
29    async fn process(self: &Self, input: Self::IN) -> Result<NXT, Error> {
30        let out = self.prev.process(input).await?;
31        Ok((self.next)(out).await)
32    }
33}
34
35#[async_trait::async_trait]
36impl<NXT, Fut, F, IN> Pipeline for AsyncConnect<Start<IN>, F>
37where
38    F: Fn(IN) -> Fut + 'static + Send + Sync,
39    Fut: Future<Output = NXT> + Send + Sync + 'static,
40    NXT: Send + Sync,
41    IN: Send + Sync,
42{
43    type IN = IN;
44    async fn process(self: &Self, input: IN) -> Result<NXT, Error> {
45        Ok((self.next)(input).await)
46    }
47}
48
49impl<NXT, Fut, F, L> Linkable for AsyncConnect<L, ErrorFuc<F>>
50where
51    F: Fn(L::OUT) -> Fut + Send + Sync,
52    Fut: Future<Output = Result<NXT, Error>> + Send + Sync + 'static,
53    NXT: Send + Sync,
54    L: Linkable + Send + Sync,
55{
56    type OUT = NXT;
57}
58
59#[async_trait::async_trait]
60impl<NXT, Fut, F, P> Pipeline for AsyncConnect<P, ErrorFuc<F>>
61where
62    F: Fn(P::OUT) -> Fut + Send + Sync,
63    Fut: Future<Output = Result<NXT, Error>> + Send + Sync + 'static,
64    NXT: Send + Sync,
65    P: Pipeline + Send + Sync,
66{
67    type IN = P::IN;
68    async fn process(self: &Self, input: Self::IN) -> Result<NXT, Error> {
69        let out = self.prev.process(input).await?;
70        (self.next.f)(out).await
71    }
72}
73
74#[async_trait::async_trait]
75impl<NXT, Fut, F, IN> Pipeline for AsyncConnect<Start<IN>, ErrorFuc<F>>
76where
77    IN: Sync + Send,
78    F: Fn(IN) -> Fut + Sync + Send,
79    Fut: Future<Output = Result<NXT, Error>> + Send + Sync + 'static,
80    NXT: Send + Sync,
81{
82    type IN = IN;
83    async fn process(self: &Self, input: Self::IN) -> Result<NXT, Error> {
84        (self.next.f)(input).await
85    }
86}