flowly_service/
lib.rs

1mod abort;
2mod and_then;
3mod chain;
4mod finally;
5mod flatten;
6mod map;
7mod maybe;
8mod optional;
9mod pass;
10mod spawn;
11mod stub;
12
13use std::marker::PhantomData;
14
15use flowly_core::Either;
16use futures::{Stream, StreamExt, TryFuture};
17
18pub trait Service<In> {
19    type Out;
20
21    fn handle(self, input: impl Stream<Item = In> + Send) -> impl Stream<Item = Self::Out> + Send;
22}
23
24pub trait ServiceExt<I>: Service<I> {
25    #[inline]
26    fn flow<U>(self, service: U) -> impl Service<I, Out = U::Out>
27    where
28        Self: Sized,
29        U: Service<Self::Out>,
30    {
31        chain::Chain {
32            service1: self,
33            service2: service,
34        }
35    }
36
37    #[inline]
38    fn spawn(self, buffer: usize) -> spawn::Spawn<Self>
39    where
40        Self: Sized,
41    {
42        spawn::Spawn {
43            service: self,
44            buffer,
45        }
46    }
47
48    #[inline]
49    fn map<U, F>(self, map: F) -> impl Service<I, Out = U>
50    where
51        Self: Sized,
52        F: Send + FnMut(Self::Out) -> U,
53    {
54        self.flow(map::MapEachFn {
55            map,
56            m: PhantomData,
57        })
58    }
59
60    #[inline]
61    fn abort_token(self, token: impl futures::Future + Send) -> impl Service<I, Out = Self::Out>
62    where
63        Self: Sized,
64    {
65        abort::AbortFn {
66            token,
67            service: self,
68        }
69    }
70}
71
72impl<I, T: Service<I>> ServiceExt<I> for T {}
73
74pub trait TryService<I, E>: Service<Result<I, E>, Out = Result<Self::Ok, Self::Error>> {
75    type Ok;
76    type Error;
77
78    fn try_handle(
79        self,
80        input: impl Stream<Item = Result<I, E>> + Send,
81    ) -> impl Stream<Item = Result<Self::Ok, Self::Error>> + Send;
82}
83
84impl<S, I, IE, O, OE> TryService<I, IE> for S
85where
86    S: Service<Result<I, IE>, Out = Result<O, OE>>,
87{
88    type Ok = O;
89    type Error = OE;
90
91    fn try_handle(
92        self,
93        input: impl Stream<Item = Result<I, IE>> + Send,
94    ) -> impl Stream<Item = Result<Self::Ok, Self::Error>> + Send {
95        self.handle(input)
96    }
97}
98
99pub trait TryServiceExt<I, E>: TryService<I, E> {
100    #[inline]
101    fn maybe_flow<OE, C, S>(
102        self,
103        cond: C,
104        service: S,
105    ) -> impl Service<Result<I, E>, Out = Result<Self::Ok, Either<Self::Error, OE>>>
106    where
107        E: Send,
108        OE: Send,
109        Self::Ok: Send,
110        Self::Error: Send,
111        Self: Sized,
112        I: Send + std::fmt::Debug,
113        C: Send + Sync + FnMut(&Result<Self::Ok, Self::Error>) -> bool,
114        S: Service<Result<Self::Ok, Self::Error>, Out = Result<Self::Ok, OE>>,
115    {
116        self.flow(maybe::Maybe { cond, service })
117    }
118
119    #[inline]
120    fn map_ok<C: FnMut(Self::Ok) -> U + Send, U>(
121        self,
122        map: C,
123    ) -> impl Service<Result<I, E>, Out = Result<U, Self::Error>>
124    where
125        Self: Sized,
126    {
127        self.flow(map::MapOk {
128            map,
129            m: PhantomData,
130        })
131    }
132
133    #[inline]
134    fn and_then<C: FnMut(Self::Ok) -> F + Send, F>(
135        self,
136        f: C,
137    ) -> impl Service<Result<I, E>, Out = Result<F::Ok, Self::Error>>
138    where
139        Self: Sized,
140        F: TryFuture<Error = Self::Error> + Send,
141    {
142        self.flow(and_then::AndThenFn { f })
143    }
144
145    #[inline]
146    fn finally<C, F>(self, f: C) -> impl Service<Result<I, E>, Out = Self::Out>
147    where
148        Self: Sized,
149        Self::Ok: Send,
150        Self::Error: Send,
151        F: Future<Output = Result<(), Self::Error>> + Send,
152        C: Send + FnMut() -> F,
153    {
154        self.flow(finally::Finally { f })
155    }
156
157    #[inline]
158    fn except<C, F>(self, f: C) -> impl Service<Result<I, E>, Out = Self::Out>
159    where
160        Self: Sized,
161        Self::Ok: Send,
162        Self::Error: Send,
163        F: Future<Output = Result<(), Self::Error>> + Send,
164        C: Send + FnMut(Self::Error) -> F,
165    {
166        self.flow(finally::Except { f })
167    }
168
169    #[inline]
170    fn stub(self) -> impl Service<Result<I, E>, Out = Result<(), Self::Error>>
171    where
172        Self: Sized,
173        Self::Ok: Send,
174        Self::Error: Send,
175    {
176        self.flow(stub::Stub)
177    }
178
179    fn try_flatten_map<C, O, F, S>(
180        self,
181        f: C,
182    ) -> impl Service<Result<I, E>, Out = Result<O, Self::Error>>
183    where
184        Self: Sized,
185        Self::Ok: Send,
186        Self::Error: Send,
187        O: Send,
188        E: Send,
189        I: Send,
190        F: Future<Output = Result<S, Self::Error>> + Send,
191        C: FnMut(Self::Ok) -> F + Send,
192        S: Stream<Item = Result<O, Self::Error>> + Send,
193    {
194        self.flow(flatten::TryFlattenMap { f })
195    }
196}
197
198impl<I, E, T: TryService<I, E>> TryServiceExt<I, E> for T {}
199
200impl<I, S: Service<I, Out = I>> Service<I> for Option<S> {
201    type Out = I;
202
203    fn handle(self, input: impl Stream<Item = I> + Send) -> impl Stream<Item = Self::Out> + Send {
204        if let Some(srv) = self {
205            srv.handle(input).left_stream()
206        } else {
207            input.right_stream()
208        }
209    }
210}
211
212pub fn flow<I>() -> pass::Pass<I> {
213    pass::Pass(PhantomData)
214}