1use crate::errors::CatBridgeError;
5use std::{
6 marker::PhantomData,
7 pin::Pin,
8 task::{Context, Poll},
9};
10use tower::Service;
11
12#[cfg(feature = "clients")]
13use crate::net::client::models::{FromRequestStreamEvent, RequestStreamEvent};
14#[cfg(feature = "servers")]
15use crate::net::server::models::{FromResponseStreamEvent, ResponseStreamEvent};
16
17#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
18#[cfg(feature = "clients")]
19pub trait OnRequestStreamEndHandler<ParamTy, State: Clone + Send + Sync + 'static> {
28 type Future: Future<Output = Result<(), CatBridgeError>> + Send + 'static;
29
30 fn call(self, event: RequestStreamEvent<State>) -> Self::Future;
31}
32
33#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
34#[cfg(feature = "clients")]
35impl<UnderlyingFnType, FnFutureTy, ResponseTy, State> OnRequestStreamEndHandler<(), State>
37 for UnderlyingFnType
38where
39 UnderlyingFnType: FnOnce() -> FnFutureTy + Clone + Send + 'static,
40 FnFutureTy: Future<Output = ResponseTy> + Send,
41 ResponseTy: Into<Result<(), CatBridgeError>>,
42 State: Clone + Send + Sync + 'static,
43{
44 type Future = Pin<Box<dyn Future<Output = Result<(), CatBridgeError>> + Send>>;
45
46 fn call(self, _event: RequestStreamEvent<State>) -> Self::Future {
47 Box::pin(async move { self().await.into() })
48 }
49}
50
51#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
53#[cfg(feature = "clients")]
54impl<UnderlyingFnType, FnFutureTy, ResponseTy, State, ArgTy> OnRequestStreamEndHandler<ArgTy, State>
55 for UnderlyingFnType
56where
57 UnderlyingFnType: FnOnce(ArgTy) -> FnFutureTy + Clone + Send + 'static,
58 FnFutureTy: Future<Output = ResponseTy> + Send,
59 ResponseTy: Into<Result<(), CatBridgeError>>,
60 ArgTy: From<RequestStreamEvent<State>> + Send,
61 State: Clone + Send + Sync + 'static,
62{
63 type Future = Pin<Box<dyn Future<Output = Result<(), CatBridgeError>> + Send>>;
64
65 fn call(self, event: RequestStreamEvent<State>) -> Self::Future {
66 Box::pin(async move { self(ArgTy::from(event)).await.into() })
67 }
68}
69
70#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
71#[cfg(feature = "servers")]
72pub trait OnResponseStreamEndHandler<ParamTy, State: Clone + Send + Sync + 'static> {
81 type Future: Future<Output = Result<(), CatBridgeError>> + Send + 'static;
82
83 fn call(self, event: ResponseStreamEvent<State>) -> Self::Future;
84}
85
86#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
87#[cfg(feature = "servers")]
88impl<UnderlyingFnType, FnFutureTy, ResponseTy, State> OnResponseStreamEndHandler<(), State>
90 for UnderlyingFnType
91where
92 UnderlyingFnType: FnOnce() -> FnFutureTy + Clone + Send + 'static,
93 FnFutureTy: Future<Output = ResponseTy> + Send,
94 ResponseTy: Into<Result<(), CatBridgeError>>,
95 State: Clone + Send + Sync + 'static,
96{
97 type Future = Pin<Box<dyn Future<Output = Result<(), CatBridgeError>> + Send>>;
98
99 fn call(self, _event: ResponseStreamEvent<State>) -> Self::Future {
100 Box::pin(async move { self().await.into() })
101 }
102}
103
104#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
106#[cfg(feature = "servers")]
107impl<UnderlyingFnType, FnFutureTy, ResponseTy, State, ArgTy>
108 OnResponseStreamEndHandler<ArgTy, State> for UnderlyingFnType
109where
110 UnderlyingFnType: FnOnce(ArgTy) -> FnFutureTy + Clone + Send + 'static,
111 FnFutureTy: Future<Output = ResponseTy> + Send,
112 ResponseTy: Into<Result<(), CatBridgeError>>,
113 ArgTy: From<ResponseStreamEvent<State>> + Send,
114 State: Clone + Send + Sync + 'static,
115{
116 type Future = Pin<Box<dyn Future<Output = Result<(), CatBridgeError>> + Send>>;
117
118 fn call(self, event: ResponseStreamEvent<State>) -> Self::Future {
119 Box::pin(async move { self(ArgTy::from(event)).await.into() })
120 }
121}
122
123macro_rules! fn_to_on_disconnect_handler {
124 (
125 [$($ty:ident),*], $last:ident
126 ) => {
127 #[allow(non_snake_case, unused_mut)]
128 #[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
129 #[cfg(feature = "clients")]
130 impl<UnderlyingFnType, FnFutureTy, OutputTy, State, $($ty,)* $last> OnRequestStreamEndHandler<($($ty,)* $last,), State> for UnderlyingFnType
131 where
132 UnderlyingFnType: FnOnce($($ty,)* $last,) -> FnFutureTy + Clone + Send + 'static,
133 FnFutureTy: Future<Output = OutputTy> + Send,
134 OutputTy: Into<Result<(), CatBridgeError>>,
135 State: Clone + Send + Sync + 'static,
136 $( $ty: FromRequestStreamEvent<State> + Send, )*
137 $last: FromRequestStreamEvent<State> + Send,
138 {
139 type Future = Pin<Box<dyn Future<Output = Result<(), CatBridgeError>> + Send>>;
140
141 fn call(self, mut req: RequestStreamEvent<State>) -> Self::Future {
142 Box::pin(async move {
143 $(
144 let $ty = $ty::from_stream_event(&mut req).await?;
145 )*
146 let $last = $last::from_stream_event(&mut req).await?;
147 let res = self($($ty,)* $last).await;
148 res.into()
149 })
150 }
151 }
152
153 #[allow(non_snake_case, unused_mut)]
154 #[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
155 #[cfg(feature = "servers")]
156 impl<UnderlyingFnType, FnFutureTy, OutputTy, State, $($ty,)* $last> OnResponseStreamEndHandler<($($ty,)* $last,), State> for UnderlyingFnType
157 where
158 UnderlyingFnType: FnOnce($($ty,)* $last,) -> FnFutureTy + Clone + Send + 'static,
159 FnFutureTy: Future<Output = OutputTy> + Send,
160 OutputTy: Into<Result<(), CatBridgeError>>,
161 State: Clone + Send + Sync + 'static,
162 $( $ty: FromResponseStreamEvent<State> + Send, )*
163 $last: FromResponseStreamEvent<State> + Send,
164 {
165 type Future = Pin<Box<dyn Future<Output = Result<(), CatBridgeError>> + Send>>;
166
167 fn call(self, mut req: ResponseStreamEvent<State>) -> Self::Future {
168 Box::pin(async move {
169 $(
170 let $ty = $ty::from_stream_event(&mut req).await?;
171 )*
172 let $last = $last::from_stream_event(&mut req).await?;
173 let res = self($($ty,)* $last).await;
174 res.into()
175 })
176 }
177 }
178 }
179}
180
181fn_to_on_disconnect_handler!([], T1);
182fn_to_on_disconnect_handler!([T1], T2);
183fn_to_on_disconnect_handler!([T1, T2], T3);
184fn_to_on_disconnect_handler!([T1, T2, T3], T4);
185fn_to_on_disconnect_handler!([T1, T2, T3, T4], T5);
186fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5], T6);
187fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6], T7);
188fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6, T7], T8);
189fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6, T7, T8], T9);
190fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9], T10);
191fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10], T11);
192fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11], T12);
193fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12], T13);
194fn_to_on_disconnect_handler!(
195 [T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13],
196 T14
197);
198fn_to_on_disconnect_handler!(
199 [T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14],
200 T15
201);
202fn_to_on_disconnect_handler!(
203 [
204 T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15
205 ],
206 T16
207);
208
209pub struct OnStreamEndHandlerAsService<HandlerTy, HandlerParamsTy> {
211 handler: HandlerTy,
212 marker: PhantomData<HandlerParamsTy>,
213}
214
215impl<HandlerTy, HandlerParamsTy> OnStreamEndHandlerAsService<HandlerTy, HandlerParamsTy> {
216 #[must_use]
217 pub(crate) fn new(handler: HandlerTy) -> Self {
218 Self {
219 handler,
220 marker: PhantomData,
221 }
222 }
223}
224
225impl<HandlerTy, HandlerParamsTy> Clone for OnStreamEndHandlerAsService<HandlerTy, HandlerParamsTy>
226where
227 HandlerTy: Clone,
228{
229 fn clone(&self) -> Self {
230 Self {
231 handler: self.handler.clone(),
232 marker: PhantomData,
233 }
234 }
235}
236
237#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
238#[cfg(feature = "clients")]
239impl<HandlerTy, HandlerParamsTy, State> Service<RequestStreamEvent<State>>
240 for OnStreamEndHandlerAsService<HandlerTy, HandlerParamsTy>
241where
242 HandlerTy: OnRequestStreamEndHandler<HandlerParamsTy, State> + Clone + Send + 'static,
243 State: Clone + Send + Sync + 'static,
244{
245 type Response = ();
246 type Error = CatBridgeError;
247 type Future = HandlerTy::Future;
248
249 #[inline]
250 fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
251 Poll::Ready(Ok(()))
252 }
253
254 fn call(&mut self, req: RequestStreamEvent<State>) -> Self::Future {
255 let handler = self.handler.clone();
256 handler.call(req)
257 }
258}
259
260#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
261#[cfg(feature = "servers")]
262impl<HandlerTy, HandlerParamsTy, State> Service<ResponseStreamEvent<State>>
263 for OnStreamEndHandlerAsService<HandlerTy, HandlerParamsTy>
264where
265 HandlerTy: OnResponseStreamEndHandler<HandlerParamsTy, State> + Clone + Send + 'static,
266 State: Clone + Send + Sync + 'static,
267{
268 type Response = ();
269 type Error = CatBridgeError;
270 type Future = HandlerTy::Future;
271
272 #[inline]
273 fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
274 Poll::Ready(Ok(()))
275 }
276
277 fn call(&mut self, req: ResponseStreamEvent<State>) -> Self::Future {
278 let handler = self.handler.clone();
279 handler.call(req)
280 }
281}