flowly_service/
lib.rs

1mod abort;
2mod and_then;
3mod chain;
4mod finally;
5mod flatten;
6mod map;
7mod maybe;
8mod optional;
9mod pass;
10mod spawn;
11mod stub;
12
13use std::marker::PhantomData;
14
15use futures::{Stream, StreamExt, TryFuture};
16
17pub trait Service<In> {
18    type Out;
19
20    fn handle(self, input: impl Stream<Item = In> + Send) -> impl Stream<Item = Self::Out> + Send;
21}
22
23pub trait ServiceExt<I>: Service<I> {
24    #[inline]
25    fn flow<U>(self, service: U) -> impl Service<I, Out = U::Out>
26    where
27        Self: Sized,
28        U: Service<Self::Out>,
29    {
30        chain::Chain {
31            service1: self,
32            service2: service,
33        }
34    }
35
36    #[inline]
37    fn maybe_flow<C, S>(self, cond: C, service: S) -> impl Service<I, Out = Self::Out>
38    where
39        I: Send,
40        C: Send + Sync + Fn(&Self::Out) -> bool,
41        Self::Out: std::fmt::Debug + Send,
42        Self: Sized,
43        S: Service<Self::Out, Out = Self::Out>,
44    {
45        self.flow(maybe::Maybe { cond, service })
46    }
47
48    #[inline]
49    fn spawn(self, buffer: usize) -> spawn::Spawn<Self>
50    where
51        Self: Sized,
52    {
53        spawn::Spawn {
54            service: self,
55            buffer,
56        }
57    }
58
59    #[inline]
60    fn map<U, F>(self, map: F) -> impl Service<I, Out = U>
61    where
62        Self: Sized,
63        F: Send + FnMut(Self::Out) -> U,
64    {
65        self.flow(map::MapEachFn {
66            map,
67            m: PhantomData,
68        })
69    }
70
71    #[inline]
72    fn abort_token(self, token: impl futures::Future + Send) -> impl Service<I, Out = Self::Out>
73    where
74        Self: Sized,
75    {
76        abort::AbortFn {
77            token,
78            service: self,
79        }
80    }
81}
82
83impl<I, T: Service<I>> ServiceExt<I> for T {}
84
85pub trait TryService<I, E>: Service<Result<I, E>, Out = Result<Self::Ok, Self::Error>> {
86    type Ok;
87    type Error;
88
89    fn try_handle(
90        self,
91        input: impl Stream<Item = Result<I, E>> + Send,
92    ) -> impl Stream<Item = Result<Self::Ok, Self::Error>> + Send;
93}
94
95impl<S, I, IE, O, OE> TryService<I, IE> for S
96where
97    S: Service<Result<I, IE>, Out = Result<O, OE>>,
98{
99    type Ok = O;
100    type Error = OE;
101
102    fn try_handle(
103        self,
104        input: impl Stream<Item = Result<I, IE>> + Send,
105    ) -> impl Stream<Item = Result<Self::Ok, Self::Error>> + Send {
106        self.handle(input)
107    }
108}
109
110pub trait TryServiceExt<I, E>: TryService<I, E> {
111    #[inline]
112    fn map_ok<C: FnMut(Self::Ok) -> U + Send, U>(
113        self,
114        map: C,
115    ) -> impl Service<Result<I, E>, Out = Result<U, Self::Error>>
116    where
117        Self: Sized,
118    {
119        self.flow(map::MapOk {
120            map,
121            m: PhantomData,
122        })
123    }
124
125    #[inline]
126    fn and_then<C: FnMut(Self::Ok) -> F + Send, F>(
127        self,
128        f: C,
129    ) -> impl Service<Result<I, E>, Out = Result<F::Ok, Self::Error>>
130    where
131        Self: Sized,
132        F: TryFuture<Error = Self::Error> + Send,
133    {
134        self.flow(and_then::AndThenFn { f })
135    }
136
137    #[inline]
138    fn finally<C, F>(self, f: C) -> impl Service<Result<I, E>, Out = Self::Out>
139    where
140        Self: Sized,
141        Self::Ok: Send,
142        Self::Error: Send,
143        F: Future<Output = Result<(), Self::Error>> + Send,
144        C: Send + FnMut() -> F,
145    {
146        self.flow(finally::Finally { f })
147    }
148
149    #[inline]
150    fn except<C, F>(self, f: C) -> impl Service<Result<I, E>, Out = Self::Out>
151    where
152        Self: Sized,
153        Self::Ok: Send,
154        Self::Error: Send,
155        F: Future<Output = Result<(), Self::Error>> + Send,
156        C: Send + FnMut(Self::Error) -> F,
157    {
158        self.flow(finally::Except { f })
159    }
160
161    #[inline]
162    fn stub(self) -> impl Service<Result<I, E>, Out = Result<(), Self::Error>>
163    where
164        Self: Sized,
165        Self::Ok: Send,
166        Self::Error: Send,
167    {
168        self.flow(stub::Stub)
169    }
170
171    fn try_flatten_map<C, O, F, S>(
172        self,
173        f: C,
174    ) -> impl Service<Result<I, E>, Out = Result<O, Self::Error>>
175    where
176        Self: Sized,
177        Self::Ok: Send,
178        Self::Error: Send,
179        O: Send,
180        E: Send,
181        I: Send,
182        F: Future<Output = Result<S, Self::Error>> + Send,
183        C: FnMut(Self::Ok) -> F + Send,
184        S: Stream<Item = Result<O, Self::Error>> + Send,
185    {
186        self.flow(flatten::TryFlattenMap { f })
187    }
188}
189
190impl<I, S: Service<I, Out = I>> Service<I> for Option<S> {
191    type Out = I;
192
193    fn handle(self, input: impl Stream<Item = I> + Send) -> impl Stream<Item = Self::Out> + Send {
194        if let Some(srv) = self {
195            srv.handle(input).left_stream()
196        } else {
197            input.right_stream()
198        }
199    }
200}
201
202pub fn flow() -> pass::Pass {
203    pass::Pass
204}