1mod abort;
2mod and_then;
3mod chain;
4mod finally;
5mod flatten;
6mod map;
7mod maybe;
8mod optional;
9mod pass;
10mod spawn;
11mod stub;
12
13use std::marker::PhantomData;
14
15use futures::{Stream, StreamExt, TryFuture};
16
17pub trait Service<In> {
18 type Out;
19
20 fn handle(self, input: impl Stream<Item = In> + Send) -> impl Stream<Item = Self::Out> + Send;
21}
22
23pub trait ServiceExt<I>: Service<I> {
24 #[inline]
25 fn flow<U>(self, service: U) -> impl Service<I, Out = U::Out>
26 where
27 Self: Sized,
28 U: Service<Self::Out>,
29 {
30 chain::Chain {
31 service1: self,
32 service2: service,
33 }
34 }
35
36 #[inline]
37 fn maybe_flow<C, S>(self, cond: C, service: S) -> impl Service<I, Out = Self::Out>
38 where
39 I: Send,
40 C: Send + Sync + Fn(&Self::Out) -> bool,
41 Self::Out: std::fmt::Debug + Send,
42 Self: Sized,
43 S: Service<Self::Out, Out = Self::Out>,
44 {
45 self.flow(maybe::Maybe { cond, service })
46 }
47
48 #[inline]
49 fn spawn(self, buffer: usize) -> spawn::Spawn<Self>
50 where
51 Self: Sized,
52 {
53 spawn::Spawn {
54 service: self,
55 buffer,
56 }
57 }
58
59 #[inline]
60 fn map<U, F>(self, map: F) -> impl Service<I, Out = U>
61 where
62 Self: Sized,
63 F: Send + FnMut(Self::Out) -> U,
64 {
65 self.flow(map::MapEachFn {
66 map,
67 m: PhantomData,
68 })
69 }
70
71 #[inline]
72 fn abort_token(self, token: impl futures::Future + Send) -> impl Service<I, Out = Self::Out>
73 where
74 Self: Sized,
75 {
76 abort::AbortFn {
77 token,
78 service: self,
79 }
80 }
81}
82
83impl<I, T: Service<I>> ServiceExt<I> for T {}
84
85pub trait TryService<I, E>: Service<Result<I, E>, Out = Result<Self::Ok, Self::Error>> {
86 type Ok;
87 type Error;
88
89 fn try_handle(
90 self,
91 input: impl Stream<Item = Result<I, E>> + Send,
92 ) -> impl Stream<Item = Result<Self::Ok, Self::Error>> + Send;
93}
94
95impl<S, I, IE, O, OE> TryService<I, IE> for S
96where
97 S: Service<Result<I, IE>, Out = Result<O, OE>>,
98{
99 type Ok = O;
100 type Error = OE;
101
102 fn try_handle(
103 self,
104 input: impl Stream<Item = Result<I, IE>> + Send,
105 ) -> impl Stream<Item = Result<Self::Ok, Self::Error>> + Send {
106 self.handle(input)
107 }
108}
109
110pub trait TryServiceExt<I, E>: TryService<I, E> {
111 #[inline]
112 fn map_ok<C: FnMut(Self::Ok) -> U + Send, U>(
113 self,
114 map: C,
115 ) -> impl Service<Result<I, E>, Out = Result<U, Self::Error>>
116 where
117 Self: Sized,
118 {
119 self.flow(map::MapOk {
120 map,
121 m: PhantomData,
122 })
123 }
124
125 #[inline]
126 fn and_then<C: FnMut(Self::Ok) -> F + Send, F>(
127 self,
128 f: C,
129 ) -> impl Service<Result<I, E>, Out = Result<F::Ok, Self::Error>>
130 where
131 Self: Sized,
132 F: TryFuture<Error = Self::Error> + Send,
133 {
134 self.flow(and_then::AndThenFn { f })
135 }
136
137 #[inline]
138 fn finally<C, F>(self, f: C) -> impl Service<Result<I, E>, Out = Self::Out>
139 where
140 Self: Sized,
141 Self::Ok: Send,
142 Self::Error: Send,
143 F: Future<Output = Result<(), Self::Error>> + Send,
144 C: Send + FnMut() -> F,
145 {
146 self.flow(finally::Finally { f })
147 }
148
149 #[inline]
150 fn except<C, F>(self, f: C) -> impl Service<Result<I, E>, Out = Self::Out>
151 where
152 Self: Sized,
153 Self::Ok: Send,
154 Self::Error: Send,
155 F: Future<Output = Result<(), Self::Error>> + Send,
156 C: Send + FnMut(Self::Error) -> F,
157 {
158 self.flow(finally::Except { f })
159 }
160
161 #[inline]
162 fn stub(self) -> impl Service<Result<I, E>, Out = Result<(), Self::Error>>
163 where
164 Self: Sized,
165 Self::Ok: Send,
166 Self::Error: Send,
167 {
168 self.flow(stub::Stub)
169 }
170
171 fn try_flatten_map<C, O, F, S>(
172 self,
173 f: C,
174 ) -> impl Service<Result<I, E>, Out = Result<O, Self::Error>>
175 where
176 Self: Sized,
177 Self::Ok: Send,
178 Self::Error: Send,
179 O: Send,
180 E: Send,
181 I: Send,
182 F: Future<Output = Result<S, Self::Error>> + Send,
183 C: FnMut(Self::Ok) -> F + Send,
184 S: Stream<Item = Result<O, Self::Error>> + Send,
185 {
186 self.flow(flatten::TryFlattenMap { f })
187 }
188}
189
190impl<I, S: Service<I, Out = I>> Service<I> for Option<S> {
191 type Out = I;
192
193 fn handle(self, input: impl Stream<Item = I> + Send) -> impl Stream<Item = Self::Out> + Send {
194 if let Some(srv) = self {
195 srv.handle(input).left_stream()
196 } else {
197 input.right_stream()
198 }
199 }
200}
201
202pub fn flow() -> pass::Pass {
203 pass::Pass
204}