1mod abort;
2mod and_then;
3mod chain;
4mod finally;
5mod flatten;
6mod map;
7mod maybe;
8mod optional;
9mod pass;
10mod spawn;
11mod stub;
12
13use std::marker::PhantomData;
14
15use flowly_core::Either;
16use futures::{Stream, StreamExt, TryFuture};
17
18pub trait Service<In> {
19 type Out;
20
21 fn handle(self, input: impl Stream<Item = In> + Send) -> impl Stream<Item = Self::Out> + Send;
22}
23
24pub trait ServiceExt<I>: Service<I> {
25 #[inline]
26 fn flow<U>(self, service: U) -> impl Service<I, Out = U::Out>
27 where
28 Self: Sized,
29 U: Service<Self::Out>,
30 {
31 chain::Chain {
32 service1: self,
33 service2: service,
34 }
35 }
36
37 #[inline]
38 fn spawn(self, buffer: usize) -> spawn::Spawn<Self>
39 where
40 Self: Sized,
41 {
42 spawn::Spawn {
43 service: self,
44 buffer,
45 }
46 }
47
48 #[inline]
49 fn map<U, F>(self, map: F) -> impl Service<I, Out = U>
50 where
51 Self: Sized,
52 F: Send + FnMut(Self::Out) -> U,
53 {
54 self.flow(map::MapEachFn {
55 map,
56 m: PhantomData,
57 })
58 }
59
60 #[inline]
61 fn abort_token(self, token: impl futures::Future + Send) -> impl Service<I, Out = Self::Out>
62 where
63 Self: Sized,
64 {
65 abort::AbortFn {
66 token,
67 service: self,
68 }
69 }
70}
71
72impl<I, T: Service<I>> ServiceExt<I> for T {}
73
74pub trait TryService<I, E>: Service<Result<I, E>, Out = Result<Self::Ok, Self::Error>> {
75 type Ok;
76 type Error;
77
78 fn try_handle(
79 self,
80 input: impl Stream<Item = Result<I, E>> + Send,
81 ) -> impl Stream<Item = Result<Self::Ok, Self::Error>> + Send;
82}
83
84impl<S, I, IE, O, OE> TryService<I, IE> for S
85where
86 S: Service<Result<I, IE>, Out = Result<O, OE>>,
87{
88 type Ok = O;
89 type Error = OE;
90
91 fn try_handle(
92 self,
93 input: impl Stream<Item = Result<I, IE>> + Send,
94 ) -> impl Stream<Item = Result<Self::Ok, Self::Error>> + Send {
95 self.handle(input)
96 }
97}
98
99pub trait TryServiceExt<I, E>: TryService<I, E> {
100 #[inline]
101 fn maybe_flow<OE, C, S>(
102 self,
103 cond: C,
104 service: S,
105 ) -> impl Service<Result<I, E>, Out = Result<Self::Ok, Either<Self::Error, OE>>>
106 where
107 E: Send,
108 OE: Send,
109 Self::Ok: Send,
110 Self::Error: Send,
111 Self: Sized,
112 I: Send + std::fmt::Debug,
113 C: Send + Sync + Fn(&Result<Self::Ok, Self::Error>) -> bool,
114 S: Service<Result<Self::Ok, Self::Error>, Out = Result<Self::Ok, OE>>,
115 {
116 self.flow(maybe::Maybe { cond, service })
117 }
118
119 #[inline]
120 fn map_ok<C: FnMut(Self::Ok) -> U + Send, U>(
121 self,
122 map: C,
123 ) -> impl Service<Result<I, E>, Out = Result<U, Self::Error>>
124 where
125 Self: Sized,
126 {
127 self.flow(map::MapOk {
128 map,
129 m: PhantomData,
130 })
131 }
132
133 #[inline]
134 fn and_then<C: FnMut(Self::Ok) -> F + Send, F>(
135 self,
136 f: C,
137 ) -> impl Service<Result<I, E>, Out = Result<F::Ok, Self::Error>>
138 where
139 Self: Sized,
140 F: TryFuture<Error = Self::Error> + Send,
141 {
142 self.flow(and_then::AndThenFn { f })
143 }
144
145 #[inline]
146 fn finally<C, F>(self, f: C) -> impl Service<Result<I, E>, Out = Self::Out>
147 where
148 Self: Sized,
149 Self::Ok: Send,
150 Self::Error: Send,
151 F: Future<Output = Result<(), Self::Error>> + Send,
152 C: Send + FnMut() -> F,
153 {
154 self.flow(finally::Finally { f })
155 }
156
157 #[inline]
158 fn except<C, F>(self, f: C) -> impl Service<Result<I, E>, Out = Self::Out>
159 where
160 Self: Sized,
161 Self::Ok: Send,
162 Self::Error: Send,
163 F: Future<Output = Result<(), Self::Error>> + Send,
164 C: Send + FnMut(Self::Error) -> F,
165 {
166 self.flow(finally::Except { f })
167 }
168
169 #[inline]
170 fn stub(self) -> impl Service<Result<I, E>, Out = Result<(), Self::Error>>
171 where
172 Self: Sized,
173 Self::Ok: Send,
174 Self::Error: Send,
175 {
176 self.flow(stub::Stub)
177 }
178
179 fn try_flatten_map<C, O, F, S>(
180 self,
181 f: C,
182 ) -> impl Service<Result<I, E>, Out = Result<O, Self::Error>>
183 where
184 Self: Sized,
185 Self::Ok: Send,
186 Self::Error: Send,
187 O: Send,
188 E: Send,
189 I: Send,
190 F: Future<Output = Result<S, Self::Error>> + Send,
191 C: FnMut(Self::Ok) -> F + Send,
192 S: Stream<Item = Result<O, Self::Error>> + Send,
193 {
194 self.flow(flatten::TryFlattenMap { f })
195 }
196}
197
198impl<I, E, T: TryService<I, E>> TryServiceExt<I, E> for T {}
199
200impl<I, S: Service<I, Out = I>> Service<I> for Option<S> {
201 type Out = I;
202
203 fn handle(self, input: impl Stream<Item = I> + Send) -> impl Stream<Item = Self::Out> + Send {
204 if let Some(srv) = self {
205 srv.handle(input).left_stream()
206 } else {
207 input.right_stream()
208 }
209 }
210}
211
212pub fn flow<I>() -> pass::Pass<I> {
213 pass::Pass(PhantomData)
214}