1use crate::errors::CatBridgeError;
5use std::{
6 marker::PhantomData,
7 pin::Pin,
8 task::{Context, Poll},
9};
10use tower::Service;
11
12#[cfg(feature = "clients")]
13use crate::net::client::models::{FromRequestStreamEvent, RequestStreamEvent};
14#[cfg(feature = "servers")]
15use crate::net::server::models::{FromResponseStreamEvent, ResponseStreamEvent};
16
17#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
27#[cfg(feature = "clients")]
28pub trait OnRequestStreamBeginHandler<ParamTy, State: Clone + Send + Sync + 'static> {
29 type Future: Future<Output = Result<bool, CatBridgeError>> + Send + 'static;
30
31 fn call(self, event: RequestStreamEvent<State>) -> Self::Future;
32}
33
34#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
36#[cfg(feature = "clients")]
37impl<UnderlyingFnType, FnFutureTy, ResponseTy, State> OnRequestStreamBeginHandler<(), State>
38 for UnderlyingFnType
39where
40 UnderlyingFnType: FnOnce() -> FnFutureTy + Clone + Send + 'static,
41 FnFutureTy: Future<Output = ResponseTy> + Send,
42 ResponseTy: Into<Result<bool, CatBridgeError>>,
43 State: Clone + Send + Sync + 'static,
44{
45 type Future = Pin<Box<dyn Future<Output = Result<bool, CatBridgeError>> + Send>>;
46
47 fn call(self, _event: RequestStreamEvent<State>) -> Self::Future {
48 Box::pin(async move { self().await.into() })
49 }
50}
51
52#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
54#[cfg(feature = "clients")]
55impl<UnderlyingFnType, FnFutureTy, ResponseTy, State, ArgTy>
56 OnRequestStreamBeginHandler<ArgTy, State> for UnderlyingFnType
57where
58 UnderlyingFnType: FnOnce(ArgTy) -> FnFutureTy + Clone + Send + 'static,
59 FnFutureTy: Future<Output = ResponseTy> + Send,
60 ResponseTy: Into<Result<bool, CatBridgeError>>,
61 ArgTy: From<RequestStreamEvent<State>> + Send,
62 State: Clone + Send + Sync + 'static,
63{
64 type Future = Pin<Box<dyn Future<Output = Result<bool, CatBridgeError>> + Send>>;
65
66 fn call(self, event: RequestStreamEvent<State>) -> Self::Future {
67 Box::pin(async move { self(ArgTy::from(event)).await.into() })
68 }
69}
70
71#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
81#[cfg(feature = "servers")]
82pub trait OnResponseStreamBeginHandler<ParamTy, State: Clone + Send + Sync + 'static> {
83 type Future: Future<Output = Result<bool, CatBridgeError>> + Send + 'static;
84
85 fn call(self, event: ResponseStreamEvent<State>) -> Self::Future;
86}
87
88#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
90#[cfg(feature = "servers")]
91impl<UnderlyingFnType, FnFutureTy, ResponseTy, State> OnResponseStreamBeginHandler<(), State>
92 for UnderlyingFnType
93where
94 UnderlyingFnType: FnOnce() -> FnFutureTy + Clone + Send + 'static,
95 FnFutureTy: Future<Output = ResponseTy> + Send,
96 ResponseTy: Into<Result<bool, CatBridgeError>>,
97 State: Clone + Send + Sync + 'static,
98{
99 type Future = Pin<Box<dyn Future<Output = Result<bool, CatBridgeError>> + Send>>;
100
101 fn call(self, _event: ResponseStreamEvent<State>) -> Self::Future {
102 Box::pin(async move { self().await.into() })
103 }
104}
105
106#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
108#[cfg(feature = "servers")]
109impl<UnderlyingFnType, FnFutureTy, ResponseTy, State, ArgTy>
110 OnResponseStreamBeginHandler<ArgTy, State> for UnderlyingFnType
111where
112 UnderlyingFnType: FnOnce(ArgTy) -> FnFutureTy + Clone + Send + 'static,
113 FnFutureTy: Future<Output = ResponseTy> + Send,
114 ResponseTy: Into<Result<bool, CatBridgeError>>,
115 ArgTy: From<ResponseStreamEvent<State>> + Send,
116 State: Clone + Send + Sync + 'static,
117{
118 type Future = Pin<Box<dyn Future<Output = Result<bool, CatBridgeError>> + Send>>;
119
120 fn call(self, event: ResponseStreamEvent<State>) -> Self::Future {
121 Box::pin(async move { self(ArgTy::from(event)).await.into() })
122 }
123}
124
125macro_rules! fn_to_on_connection_handler {
126 (
127 [$($ty:ident),*], $last:ident
128 ) => {
129 #[allow(non_snake_case, unused_mut)]
130 #[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
131 #[cfg(feature = "clients")]
132 impl<UnderlyingFnType, FnFutureTy, OutputTy, State, $($ty,)* $last> OnRequestStreamBeginHandler<($($ty,)* $last,), State> for UnderlyingFnType
133 where
134 UnderlyingFnType: FnOnce($($ty,)* $last,) -> FnFutureTy + Clone + Send + 'static,
135 FnFutureTy: Future<Output = OutputTy> + Send,
136 OutputTy: Into<Result<bool, CatBridgeError>>,
137 State: Clone + Send + Sync + 'static,
138 $( $ty: FromRequestStreamEvent<State> + Send, )*
139 $last: FromRequestStreamEvent<State> + Send,
140 {
141 type Future = Pin<Box<dyn Future<Output = Result<bool, CatBridgeError>> + Send>>;
142
143 fn call(self, mut req: RequestStreamEvent<State>) -> Self::Future {
144 Box::pin(async move {
145 $(
146 let $ty = $ty::from_stream_event(&mut req).await?;
147 )*
148 let $last = $last::from_stream_event(&mut req).await?;
149 let res = self($($ty,)* $last).await;
150 res.into()
151 })
152 }
153 }
154
155 #[allow(non_snake_case, unused_mut)]
156 #[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
157 #[cfg(feature = "servers")]
158 impl<UnderlyingFnType, FnFutureTy, OutputTy, State, $($ty,)* $last> OnResponseStreamBeginHandler<($($ty,)* $last,), State> for UnderlyingFnType
159 where
160 UnderlyingFnType: FnOnce($($ty,)* $last,) -> FnFutureTy + Clone + Send + 'static,
161 FnFutureTy: Future<Output = OutputTy> + Send,
162 OutputTy: Into<Result<bool, CatBridgeError>>,
163 State: Clone + Send + Sync + 'static,
164 $( $ty: FromResponseStreamEvent<State> + Send, )*
165 $last: FromResponseStreamEvent<State> + Send,
166 {
167 type Future = Pin<Box<dyn Future<Output = Result<bool, CatBridgeError>> + Send>>;
168
169 fn call(self, mut req: ResponseStreamEvent<State>) -> Self::Future {
170 Box::pin(async move {
171 $(
172 let $ty = $ty::from_stream_event(&mut req).await?;
173 )*
174 let $last = $last::from_stream_event(&mut req).await?;
175 let res = self($($ty,)* $last).await;
176 res.into()
177 })
178 }
179 }
180 }
181}
182
183fn_to_on_connection_handler!([], T1);
184fn_to_on_connection_handler!([T1], T2);
185fn_to_on_connection_handler!([T1, T2], T3);
186fn_to_on_connection_handler!([T1, T2, T3], T4);
187fn_to_on_connection_handler!([T1, T2, T3, T4], T5);
188fn_to_on_connection_handler!([T1, T2, T3, T4, T5], T6);
189fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6], T7);
190fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6, T7], T8);
191fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6, T7, T8], T9);
192fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9], T10);
193fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10], T11);
194fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11], T12);
195fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12], T13);
196fn_to_on_connection_handler!(
197 [T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13],
198 T14
199);
200fn_to_on_connection_handler!(
201 [T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14],
202 T15
203);
204fn_to_on_connection_handler!(
205 [
206 T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15
207 ],
208 T16
209);
210
211pub struct OnStreamBeginHandlerAsService<HandlerTy, HandlerParamsTy> {
213 handler: HandlerTy,
214 marker: PhantomData<HandlerParamsTy>,
215}
216
217impl<HandlerTy, HandlerParamsTy> OnStreamBeginHandlerAsService<HandlerTy, HandlerParamsTy> {
218 #[must_use]
219 pub(crate) fn new(handler: HandlerTy) -> Self {
220 Self {
221 handler,
222 marker: PhantomData,
223 }
224 }
225}
226
227impl<HandlerTy, HandlerParamsTy> Clone for OnStreamBeginHandlerAsService<HandlerTy, HandlerParamsTy>
228where
229 HandlerTy: Clone,
230{
231 fn clone(&self) -> Self {
232 Self {
233 handler: self.handler.clone(),
234 marker: PhantomData,
235 }
236 }
237}
238
239#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
240#[cfg(feature = "clients")]
241impl<HandlerTy, HandlerParamsTy, State> Service<RequestStreamEvent<State>>
242 for OnStreamBeginHandlerAsService<HandlerTy, HandlerParamsTy>
243where
244 HandlerTy: OnRequestStreamBeginHandler<HandlerParamsTy, State> + Clone + Send + 'static,
245 State: Clone + Send + Sync + 'static,
246{
247 type Response = bool;
248 type Error = CatBridgeError;
249 type Future = HandlerTy::Future;
250
251 #[inline]
252 fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
253 Poll::Ready(Ok(()))
254 }
255
256 fn call(&mut self, req: RequestStreamEvent<State>) -> Self::Future {
257 let handler = self.handler.clone();
258 handler.call(req)
259 }
260}
261
262#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
263#[cfg(feature = "servers")]
264impl<HandlerTy, HandlerParamsTy, State> Service<ResponseStreamEvent<State>>
265 for OnStreamBeginHandlerAsService<HandlerTy, HandlerParamsTy>
266where
267 HandlerTy: OnResponseStreamBeginHandler<HandlerParamsTy, State> + Clone + Send + 'static,
268 State: Clone + Send + Sync + 'static,
269{
270 type Response = bool;
271 type Error = CatBridgeError;
272 type Future = HandlerTy::Future;
273
274 #[inline]
275 fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
276 Poll::Ready(Ok(()))
277 }
278
279 fn call(&mut self, req: ResponseStreamEvent<State>) -> Self::Future {
280 let handler = self.handler.clone();
281 handler.call(req)
282 }
283}