cat_dev/net/handlers/
on_stream_begin_handlers.rs

1//! Utilities to turn functions into "on stream begin"/"on connection"
2//! handlers.
3
4use crate::errors::CatBridgeError;
5use std::{
6	marker::PhantomData,
7	pin::Pin,
8	task::{Context, Poll},
9};
10use tower::Service;
11
12#[cfg(feature = "clients")]
13use crate::net::client::models::{FromRequestStreamEvent, RequestStreamEvent};
14#[cfg(feature = "servers")]
15use crate::net::server::models::{FromResponseStreamEvent, ResponseStreamEvent};
16
17/// A handler for when a stream begins (an "on connection" event), attempts
18/// to be an incredibly thin layer between a function, and the actual
19/// ending handler.
20///
21/// Implemented so we can implement for function that accepts any varying
22/// amount of args of types that implement from request parts.
23///
24/// `ParamTy` is kept to prevent generation of conflicting type implementations
25/// of this trait. It however is not actually needed by any of our code.
26#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
27#[cfg(feature = "clients")]
28pub trait OnRequestStreamBeginHandler<ParamTy, State: Clone + Send + Sync + 'static> {
29	type Future: Future<Output = Result<bool, CatBridgeError>> + Send + 'static;
30
31	fn call(self, event: RequestStreamEvent<State>) -> Self::Future;
32}
33
34/// Allow any async function without arguments to be a handler.
35#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
36#[cfg(feature = "clients")]
37impl<UnderlyingFnType, FnFutureTy, ResponseTy, State> OnRequestStreamBeginHandler<(), State>
38	for UnderlyingFnType
39where
40	UnderlyingFnType: FnOnce() -> FnFutureTy + Clone + Send + 'static,
41	FnFutureTy: Future<Output = ResponseTy> + Send,
42	ResponseTy: Into<Result<bool, CatBridgeError>>,
43	State: Clone + Send + Sync + 'static,
44{
45	type Future = Pin<Box<dyn Future<Output = Result<bool, CatBridgeError>> + Send>>;
46
47	fn call(self, _event: RequestStreamEvent<State>) -> Self::Future {
48		Box::pin(async move { self().await.into() })
49	}
50}
51
52/// Allow any async function with a single consuming argument.
53#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
54#[cfg(feature = "clients")]
55impl<UnderlyingFnType, FnFutureTy, ResponseTy, State, ArgTy>
56	OnRequestStreamBeginHandler<ArgTy, State> for UnderlyingFnType
57where
58	UnderlyingFnType: FnOnce(ArgTy) -> FnFutureTy + Clone + Send + 'static,
59	FnFutureTy: Future<Output = ResponseTy> + Send,
60	ResponseTy: Into<Result<bool, CatBridgeError>>,
61	ArgTy: From<RequestStreamEvent<State>> + Send,
62	State: Clone + Send + Sync + 'static,
63{
64	type Future = Pin<Box<dyn Future<Output = Result<bool, CatBridgeError>> + Send>>;
65
66	fn call(self, event: RequestStreamEvent<State>) -> Self::Future {
67		Box::pin(async move { self(ArgTy::from(event)).await.into() })
68	}
69}
70
71/// A handler for when a stream begins (an "on connection" event), attempts
72/// to be an incredibly thin layer between a function, and the actual
73/// ending handler.
74///
75/// Implemented so we can implement for function that accepts any varying
76/// amount of args of types that implement from request parts.
77///
78/// `ParamTy` is kept to prevent generation of conflicting type implementations
79/// of this trait. It however is not actually needed by any of our code.
80#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
81#[cfg(feature = "servers")]
82pub trait OnResponseStreamBeginHandler<ParamTy, State: Clone + Send + Sync + 'static> {
83	type Future: Future<Output = Result<bool, CatBridgeError>> + Send + 'static;
84
85	fn call(self, event: ResponseStreamEvent<State>) -> Self::Future;
86}
87
88/// Allow any async function without arguments to be a handler.
89#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
90#[cfg(feature = "servers")]
91impl<UnderlyingFnType, FnFutureTy, ResponseTy, State> OnResponseStreamBeginHandler<(), State>
92	for UnderlyingFnType
93where
94	UnderlyingFnType: FnOnce() -> FnFutureTy + Clone + Send + 'static,
95	FnFutureTy: Future<Output = ResponseTy> + Send,
96	ResponseTy: Into<Result<bool, CatBridgeError>>,
97	State: Clone + Send + Sync + 'static,
98{
99	type Future = Pin<Box<dyn Future<Output = Result<bool, CatBridgeError>> + Send>>;
100
101	fn call(self, _event: ResponseStreamEvent<State>) -> Self::Future {
102		Box::pin(async move { self().await.into() })
103	}
104}
105
106/// Allow any async function with a single consuming argument.
107#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
108#[cfg(feature = "servers")]
109impl<UnderlyingFnType, FnFutureTy, ResponseTy, State, ArgTy>
110	OnResponseStreamBeginHandler<ArgTy, State> for UnderlyingFnType
111where
112	UnderlyingFnType: FnOnce(ArgTy) -> FnFutureTy + Clone + Send + 'static,
113	FnFutureTy: Future<Output = ResponseTy> + Send,
114	ResponseTy: Into<Result<bool, CatBridgeError>>,
115	ArgTy: From<ResponseStreamEvent<State>> + Send,
116	State: Clone + Send + Sync + 'static,
117{
118	type Future = Pin<Box<dyn Future<Output = Result<bool, CatBridgeError>> + Send>>;
119
120	fn call(self, event: ResponseStreamEvent<State>) -> Self::Future {
121		Box::pin(async move { self(ArgTy::from(event)).await.into() })
122	}
123}
124
125macro_rules! fn_to_on_connection_handler {
126	(
127		[$($ty:ident),*], $last:ident
128	) => {
129		#[allow(non_snake_case, unused_mut)]
130		#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
131		#[cfg(feature = "clients")]
132		impl<UnderlyingFnType, FnFutureTy, OutputTy, State, $($ty,)* $last> OnRequestStreamBeginHandler<($($ty,)* $last,), State> for UnderlyingFnType
133		where
134			UnderlyingFnType: FnOnce($($ty,)* $last,) -> FnFutureTy + Clone + Send + 'static,
135			FnFutureTy: Future<Output = OutputTy> + Send,
136			OutputTy: Into<Result<bool, CatBridgeError>>,
137			State: Clone + Send + Sync + 'static,
138			$( $ty: FromRequestStreamEvent<State> + Send, )*
139			$last: FromRequestStreamEvent<State> + Send,
140		{
141			type Future = Pin<Box<dyn Future<Output = Result<bool, CatBridgeError>> + Send>>;
142
143			fn call(self, mut req: RequestStreamEvent<State>) -> Self::Future {
144				Box::pin(async move {
145					$(
146						let $ty = $ty::from_stream_event(&mut req).await?;
147					)*
148					let $last = $last::from_stream_event(&mut req).await?;
149					let res = self($($ty,)* $last).await;
150					res.into()
151				})
152			}
153		}
154
155		#[allow(non_snake_case, unused_mut)]
156		#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
157		#[cfg(feature = "servers")]
158		impl<UnderlyingFnType, FnFutureTy, OutputTy, State, $($ty,)* $last> OnResponseStreamBeginHandler<($($ty,)* $last,), State> for UnderlyingFnType
159		where
160			UnderlyingFnType: FnOnce($($ty,)* $last,) -> FnFutureTy + Clone + Send + 'static,
161			FnFutureTy: Future<Output = OutputTy> + Send,
162			OutputTy: Into<Result<bool, CatBridgeError>>,
163			State: Clone + Send + Sync + 'static,
164			$( $ty: FromResponseStreamEvent<State> + Send, )*
165			$last: FromResponseStreamEvent<State> + Send,
166		{
167			type Future = Pin<Box<dyn Future<Output = Result<bool, CatBridgeError>> + Send>>;
168
169			fn call(self, mut req: ResponseStreamEvent<State>) -> Self::Future {
170				Box::pin(async move {
171					$(
172						let $ty = $ty::from_stream_event(&mut req).await?;
173					)*
174					let $last = $last::from_stream_event(&mut req).await?;
175					let res = self($($ty,)* $last).await;
176					res.into()
177				})
178			}
179		}
180	}
181}
182
183fn_to_on_connection_handler!([], T1);
184fn_to_on_connection_handler!([T1], T2);
185fn_to_on_connection_handler!([T1, T2], T3);
186fn_to_on_connection_handler!([T1, T2, T3], T4);
187fn_to_on_connection_handler!([T1, T2, T3, T4], T5);
188fn_to_on_connection_handler!([T1, T2, T3, T4, T5], T6);
189fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6], T7);
190fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6, T7], T8);
191fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6, T7, T8], T9);
192fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9], T10);
193fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10], T11);
194fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11], T12);
195fn_to_on_connection_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12], T13);
196fn_to_on_connection_handler!(
197	[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13],
198	T14
199);
200fn_to_on_connection_handler!(
201	[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14],
202	T15
203);
204fn_to_on_connection_handler!(
205	[
206		T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15
207	],
208	T16
209);
210
211/// A wrapper around a handler to implement a service.
212pub struct OnStreamBeginHandlerAsService<HandlerTy, HandlerParamsTy> {
213	handler: HandlerTy,
214	marker: PhantomData<HandlerParamsTy>,
215}
216
217impl<HandlerTy, HandlerParamsTy> OnStreamBeginHandlerAsService<HandlerTy, HandlerParamsTy> {
218	#[must_use]
219	pub(crate) fn new(handler: HandlerTy) -> Self {
220		Self {
221			handler,
222			marker: PhantomData,
223		}
224	}
225}
226
227impl<HandlerTy, HandlerParamsTy> Clone for OnStreamBeginHandlerAsService<HandlerTy, HandlerParamsTy>
228where
229	HandlerTy: Clone,
230{
231	fn clone(&self) -> Self {
232		Self {
233			handler: self.handler.clone(),
234			marker: PhantomData,
235		}
236	}
237}
238
239#[cfg_attr(docsrs, doc(cfg(feature = "clients")))]
240#[cfg(feature = "clients")]
241impl<HandlerTy, HandlerParamsTy, State> Service<RequestStreamEvent<State>>
242	for OnStreamBeginHandlerAsService<HandlerTy, HandlerParamsTy>
243where
244	HandlerTy: OnRequestStreamBeginHandler<HandlerParamsTy, State> + Clone + Send + 'static,
245	State: Clone + Send + Sync + 'static,
246{
247	type Response = bool;
248	type Error = CatBridgeError;
249	type Future = HandlerTy::Future;
250
251	#[inline]
252	fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
253		Poll::Ready(Ok(()))
254	}
255
256	fn call(&mut self, req: RequestStreamEvent<State>) -> Self::Future {
257		let handler = self.handler.clone();
258		handler.call(req)
259	}
260}
261
262#[cfg_attr(docsrs, doc(cfg(feature = "servers")))]
263#[cfg(feature = "servers")]
264impl<HandlerTy, HandlerParamsTy, State> Service<ResponseStreamEvent<State>>
265	for OnStreamBeginHandlerAsService<HandlerTy, HandlerParamsTy>
266where
267	HandlerTy: OnResponseStreamBeginHandler<HandlerParamsTy, State> + Clone + Send + 'static,
268	State: Clone + Send + Sync + 'static,
269{
270	type Response = bool;
271	type Error = CatBridgeError;
272	type Future = HandlerTy::Future;
273
274	#[inline]
275	fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
276		Poll::Ready(Ok(()))
277	}
278
279	fn call(&mut self, req: ResponseStreamEvent<State>) -> Self::Future {
280		let handler = self.handler.clone();
281		handler.call(req)
282	}
283}