cat_dev/net/handlers/
on_stream_end_handlers.rs

1//! Utilities to turn functions into "on stream end"/"on connection"
2//! handlers.
3
4use crate::errors::CatBridgeError;
5use std::{
6	marker::PhantomData,
7	pin::Pin,
8	task::{Context, Poll},
9};
10use tower::Service;
11
12#[cfg(feature = "clients")]
13use crate::net::client::models::{FromRequestStreamEvent, RequestStreamEvent};
14#[cfg(feature = "servers")]
15use crate::net::server::models::{FromResponseStreamEvent, ResponseStreamEvent};
16
17#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
18#[cfg(feature = "clients")]
19/// A stream ending/(on disconnect) handler, attempts to be an incredibly
20/// thin layer between a function, and the actual ending handler.
21///
22/// Implemented so we can implement for function that accepts any varying
23/// amount of args of types that implement from request parts.
24///
25/// `ParamTy` is kept to prevent generation of conflicting type implementations
26/// of this trait. It however is not actually needed by any of our code.
27pub trait OnRequestStreamEndHandler<ParamTy, State: Clone + Send + Sync + 'static> {
28	type Future: Future<Output = Result<(), CatBridgeError>> + Send + 'static;
29
30	fn call(self, event: RequestStreamEvent<State>) -> Self::Future;
31}
32
33#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
34#[cfg(feature = "clients")]
35/// Allow any async function without arguments to be a handler
36impl<UnderlyingFnType, FnFutureTy, ResponseTy, State> OnRequestStreamEndHandler<(), State>
37	for UnderlyingFnType
38where
39	UnderlyingFnType: FnOnce() -> FnFutureTy + Clone + Send + 'static,
40	FnFutureTy: Future<Output = ResponseTy> + Send,
41	ResponseTy: Into<Result<(), CatBridgeError>>,
42	State: Clone + Send + Sync + 'static,
43{
44	type Future = Pin<Box<dyn Future<Output = Result<(), CatBridgeError>> + Send>>;
45
46	fn call(self, _event: RequestStreamEvent<State>) -> Self::Future {
47		Box::pin(async move { self().await.into() })
48	}
49}
50
51/// Allow any async function with a single consuming argument.
52#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
53#[cfg(feature = "clients")]
54impl<UnderlyingFnType, FnFutureTy, ResponseTy, State, ArgTy> OnRequestStreamEndHandler<ArgTy, State>
55	for UnderlyingFnType
56where
57	UnderlyingFnType: FnOnce(ArgTy) -> FnFutureTy + Clone + Send + 'static,
58	FnFutureTy: Future<Output = ResponseTy> + Send,
59	ResponseTy: Into<Result<(), CatBridgeError>>,
60	ArgTy: From<RequestStreamEvent<State>> + Send,
61	State: Clone + Send + Sync + 'static,
62{
63	type Future = Pin<Box<dyn Future<Output = Result<(), CatBridgeError>> + Send>>;
64
65	fn call(self, event: RequestStreamEvent<State>) -> Self::Future {
66		Box::pin(async move { self(ArgTy::from(event)).await.into() })
67	}
68}
69
70#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
71#[cfg(feature = "servers")]
72/// A stream ending/(on disconnect) handler, attempts to be an incredibly
73/// thin layer between a function, and the actual ending handler.
74///
75/// Implemented so we can implement for function that accepts any varying
76/// amount of args of types that implement from request parts.
77///
78/// `ParamTy` is kept to prevent generation of conflicting type implementations
79/// of this trait. It however is not actually needed by any of our code.
80pub trait OnResponseStreamEndHandler<ParamTy, State: Clone + Send + Sync + 'static> {
81	type Future: Future<Output = Result<(), CatBridgeError>> + Send + 'static;
82
83	fn call(self, event: ResponseStreamEvent<State>) -> Self::Future;
84}
85
86#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
87#[cfg(feature = "servers")]
88/// Allow any async function without arguments to be a handler
89impl<UnderlyingFnType, FnFutureTy, ResponseTy, State> OnResponseStreamEndHandler<(), State>
90	for UnderlyingFnType
91where
92	UnderlyingFnType: FnOnce() -> FnFutureTy + Clone + Send + 'static,
93	FnFutureTy: Future<Output = ResponseTy> + Send,
94	ResponseTy: Into<Result<(), CatBridgeError>>,
95	State: Clone + Send + Sync + 'static,
96{
97	type Future = Pin<Box<dyn Future<Output = Result<(), CatBridgeError>> + Send>>;
98
99	fn call(self, _event: ResponseStreamEvent<State>) -> Self::Future {
100		Box::pin(async move { self().await.into() })
101	}
102}
103
104/// Allow any async function with a single consuming argument.
105#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
106#[cfg(feature = "servers")]
107impl<UnderlyingFnType, FnFutureTy, ResponseTy, State, ArgTy>
108	OnResponseStreamEndHandler<ArgTy, State> for UnderlyingFnType
109where
110	UnderlyingFnType: FnOnce(ArgTy) -> FnFutureTy + Clone + Send + 'static,
111	FnFutureTy: Future<Output = ResponseTy> + Send,
112	ResponseTy: Into<Result<(), CatBridgeError>>,
113	ArgTy: From<ResponseStreamEvent<State>> + Send,
114	State: Clone + Send + Sync + 'static,
115{
116	type Future = Pin<Box<dyn Future<Output = Result<(), CatBridgeError>> + Send>>;
117
118	fn call(self, event: ResponseStreamEvent<State>) -> Self::Future {
119		Box::pin(async move { self(ArgTy::from(event)).await.into() })
120	}
121}
122
123macro_rules! fn_to_on_disconnect_handler {
124	(
125		[$($ty:ident),*], $last:ident
126	) => {
127		#[allow(non_snake_case, unused_mut)]
128		#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
129		#[cfg(feature = "clients")]
130		impl<UnderlyingFnType, FnFutureTy, OutputTy, State, $($ty,)* $last> OnRequestStreamEndHandler<($($ty,)* $last,), State> for UnderlyingFnType
131		where
132			UnderlyingFnType: FnOnce($($ty,)* $last,) -> FnFutureTy + Clone + Send + 'static,
133			FnFutureTy: Future<Output = OutputTy> + Send,
134			OutputTy: Into<Result<(), CatBridgeError>>,
135			State: Clone + Send + Sync + 'static,
136			$( $ty: FromRequestStreamEvent<State> + Send, )*
137			$last: FromRequestStreamEvent<State> + Send,
138		{
139			type Future = Pin<Box<dyn Future<Output = Result<(), CatBridgeError>> + Send>>;
140
141			fn call(self, mut req: RequestStreamEvent<State>) -> Self::Future {
142				Box::pin(async move {
143					$(
144						let $ty = $ty::from_stream_event(&mut req).await?;
145					)*
146					let $last = $last::from_stream_event(&mut req).await?;
147					let res = self($($ty,)* $last).await;
148					res.into()
149				})
150			}
151		}
152
153		#[allow(non_snake_case, unused_mut)]
154		#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
155		#[cfg(feature = "servers")]
156		impl<UnderlyingFnType, FnFutureTy, OutputTy, State, $($ty,)* $last> OnResponseStreamEndHandler<($($ty,)* $last,), State> for UnderlyingFnType
157		where
158			UnderlyingFnType: FnOnce($($ty,)* $last,) -> FnFutureTy + Clone + Send + 'static,
159			FnFutureTy: Future<Output = OutputTy> + Send,
160			OutputTy: Into<Result<(), CatBridgeError>>,
161			State: Clone + Send + Sync + 'static,
162			$( $ty: FromResponseStreamEvent<State> + Send, )*
163			$last: FromResponseStreamEvent<State> + Send,
164		{
165			type Future = Pin<Box<dyn Future<Output = Result<(), CatBridgeError>> + Send>>;
166
167			fn call(self, mut req: ResponseStreamEvent<State>) -> Self::Future {
168				Box::pin(async move {
169					$(
170						let $ty = $ty::from_stream_event(&mut req).await?;
171					)*
172					let $last = $last::from_stream_event(&mut req).await?;
173					let res = self($($ty,)* $last).await;
174					res.into()
175				})
176			}
177		}
178	}
179}
180
181fn_to_on_disconnect_handler!([], T1);
182fn_to_on_disconnect_handler!([T1], T2);
183fn_to_on_disconnect_handler!([T1, T2], T3);
184fn_to_on_disconnect_handler!([T1, T2, T3], T4);
185fn_to_on_disconnect_handler!([T1, T2, T3, T4], T5);
186fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5], T6);
187fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6], T7);
188fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6, T7], T8);
189fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6, T7, T8], T9);
190fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9], T10);
191fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10], T11);
192fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11], T12);
193fn_to_on_disconnect_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12], T13);
194fn_to_on_disconnect_handler!(
195	[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13],
196	T14
197);
198fn_to_on_disconnect_handler!(
199	[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14],
200	T15
201);
202fn_to_on_disconnect_handler!(
203	[
204		T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15
205	],
206	T16
207);
208
209/// A wrapper around a handler to implement a service.
210pub struct OnStreamEndHandlerAsService<HandlerTy, HandlerParamsTy> {
211	handler: HandlerTy,
212	marker: PhantomData<HandlerParamsTy>,
213}
214
215impl<HandlerTy, HandlerParamsTy> OnStreamEndHandlerAsService<HandlerTy, HandlerParamsTy> {
216	#[must_use]
217	pub(crate) fn new(handler: HandlerTy) -> Self {
218		Self {
219			handler,
220			marker: PhantomData,
221		}
222	}
223}
224
225impl<HandlerTy, HandlerParamsTy> Clone for OnStreamEndHandlerAsService<HandlerTy, HandlerParamsTy>
226where
227	HandlerTy: Clone,
228{
229	fn clone(&self) -> Self {
230		Self {
231			handler: self.handler.clone(),
232			marker: PhantomData,
233		}
234	}
235}
236
237#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
238#[cfg(feature = "clients")]
239impl<HandlerTy, HandlerParamsTy, State> Service<RequestStreamEvent<State>>
240	for OnStreamEndHandlerAsService<HandlerTy, HandlerParamsTy>
241where
242	HandlerTy: OnRequestStreamEndHandler<HandlerParamsTy, State> + Clone + Send + 'static,
243	State: Clone + Send + Sync + 'static,
244{
245	type Response = ();
246	type Error = CatBridgeError;
247	type Future = HandlerTy::Future;
248
249	#[inline]
250	fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
251		Poll::Ready(Ok(()))
252	}
253
254	fn call(&mut self, req: RequestStreamEvent<State>) -> Self::Future {
255		let handler = self.handler.clone();
256		handler.call(req)
257	}
258}
259
260#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
261#[cfg(feature = "servers")]
262impl<HandlerTy, HandlerParamsTy, State> Service<ResponseStreamEvent<State>>
263	for OnStreamEndHandlerAsService<HandlerTy, HandlerParamsTy>
264where
265	HandlerTy: OnResponseStreamEndHandler<HandlerParamsTy, State> + Clone + Send + 'static,
266	State: Clone + Send + Sync + 'static,
267{
268	type Response = ();
269	type Error = CatBridgeError;
270	type Future = HandlerTy::Future;
271
272	#[inline]
273	fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
274		Poll::Ready(Ok(()))
275	}
276
277	fn call(&mut self, req: ResponseStreamEvent<State>) -> Self::Future {
278		let handler = self.handler.clone();
279		handler.call(req)
280	}
281}