1use crate::{
4 errors::CatBridgeError,
5 net::{Extensions, models::Response},
6};
7use fnv::FnvHasher;
8use std::{
9 fmt::{Debug, Formatter, Result as FmtResult},
10 hash::{Hash, Hasher},
11 net::SocketAddr,
12};
13use tokio::{sync::mpsc::Sender, task::Builder as TaskBuilder};
14use tower::{Service, util::BoxCloneService};
15use tracing::warn;
16use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit};
17
18#[derive(Clone, Debug, Valuable)]
24pub enum ResponseStreamMessage {
25 Disconnect,
27 Response(Response),
29}
30
31pub struct ResponseStreamEvent<State: Clone + Send + Sync + 'static = ()> {
33 connection_channel: Option<Sender<ResponseStreamMessage>>,
38 ext: Extensions,
40 source_address: SocketAddr,
42 stream_id: Option<u64>,
46 state: State,
48}
49
50impl ResponseStreamEvent<()> {
51 #[must_use]
52 pub const fn new(
53 connection_channel: Sender<ResponseStreamMessage>,
54 source_address: SocketAddr,
55 stream_id: Option<u64>,
56 ) -> Self {
57 Self::new_with_state(connection_channel, source_address, stream_id, ())
58 }
59
60 #[must_use]
61 pub const fn new_disconnected(source_address: SocketAddr, stream_id: Option<u64>) -> Self {
62 Self::new_disconnected_with_state(source_address, stream_id, ())
63 }
64}
65
66impl<State: Clone + Send + Sync + 'static> ResponseStreamEvent<State> {
67 #[must_use]
68 pub const fn new_with_state(
69 connection_channel: Sender<ResponseStreamMessage>,
70 source_address: SocketAddr,
71 stream_id: Option<u64>,
72 state: State,
73 ) -> Self {
74 Self {
75 connection_channel: Some(connection_channel),
76 ext: Extensions::new(),
77 source_address,
78 stream_id,
79 state,
80 }
81 }
82
83 #[must_use]
84 pub const fn new_disconnected_with_state(
85 source_address: SocketAddr,
86 stream_id: Option<u64>,
87 state: State,
88 ) -> Self {
89 Self {
90 connection_channel: None,
91 ext: Extensions::new(),
92 source_address,
93 stream_id,
94 state,
95 }
96 }
97
98 #[must_use]
103 pub fn stream_id(&self) -> u64 {
104 if let Some(id) = self.stream_id {
105 id
106 } else {
107 let mut hasher = FnvHasher::default();
108 self.source_address.hash(&mut hasher);
109 hasher.finish()
110 }
111 }
112
113 #[must_use]
117 pub const fn out_of_band_channel(&self) -> Option<&Sender<ResponseStreamMessage>> {
118 self.connection_channel.as_ref()
119 }
120
121 #[must_use]
122 pub const fn state(&self) -> &State {
123 &self.state
124 }
125 #[must_use]
126 pub fn state_mut(&mut self) -> &mut State {
127 &mut self.state
128 }
129
130 #[must_use]
131 pub const fn extensions(&self) -> &Extensions {
132 &self.ext
133 }
134 #[must_use]
135 pub fn extensions_mut(&mut self) -> &mut Extensions {
136 &mut self.ext
137 }
138
139 #[must_use]
140 pub const fn source(&self) -> &SocketAddr {
141 &self.source_address
142 }
143 #[must_use]
144 pub fn is_ipv4(&self) -> bool {
145 self.source_address.ip().is_ipv4()
146 }
147 #[must_use]
148 pub fn is_ipv6(&self) -> bool {
149 self.source_address.ip().is_ipv6()
150 }
151}
152
153impl<State: Clone + Send + Sync + 'static> Debug for ResponseStreamEvent<State> {
154 fn fmt(&self, fmt: &mut Formatter<'_>) -> FmtResult {
155 fmt.debug_struct("ResponseStreamEvent")
156 .field("source_address", &self.source_address)
159 .field("stream_id", &self.stream_id)
160 .finish_non_exhaustive()
161 }
162}
163
164const CONNECTION_EVENT_FIELDS: &[NamedField<'static>] = &[
165 NamedField::new("source_address"),
166 NamedField::new("stream_id"),
167];
168
169impl<State: Clone + Send + Sync + 'static> Structable for ResponseStreamEvent<State> {
170 fn definition(&self) -> StructDef<'_> {
171 StructDef::new_static(
172 "ResponseStreamEvent",
173 Fields::Named(CONNECTION_EVENT_FIELDS),
174 )
175 }
176}
177
178impl<State: Clone + Send + Sync + 'static> Valuable for ResponseStreamEvent<State> {
179 fn as_value(&self) -> Value<'_> {
180 Value::Structable(self)
181 }
182
183 fn visit(&self, visitor: &mut dyn Visit) {
184 visitor.visit_named_fields(&NamedValues::new(
185 CONNECTION_EVENT_FIELDS,
186 &[
187 Valuable::as_value(&format!("{}", self.source_address)),
188 Valuable::as_value(&self.stream_id),
189 ],
190 ));
191 }
192}
193
194pub type UnderlyingOnStreamBeginService<State> =
196 BoxCloneService<ResponseStreamEvent<State>, bool, CatBridgeError>;
197pub type UnderlyingOnStreamEndService<State> =
199 BoxCloneService<ResponseStreamEvent<State>, (), CatBridgeError>;
200
201pub trait FromResponseStreamEvent<State: Clone + Send + Sync + 'static>: Sized {
205 fn from_stream_event(
206 event: &mut ResponseStreamEvent<State>,
207 ) -> impl Future<Output = Result<Self, CatBridgeError>> + Send;
208}
209
210pub(crate) struct DisconnectAsyncDropServer<
215 ServiceTy: Clone
216 + Service<
217 ResponseStreamEvent<State>,
218 Future = ServiceFutureTy,
219 Response = (),
220 Error = CatBridgeError,
221 > + Send
222 + 'static,
223 ServiceFutureTy: Future<Output = Result<(), CatBridgeError>> + Send,
224 State: Clone + Send + Sync + 'static,
225> {
226 service: ServiceTy,
227 state: State,
228 source_address: SocketAddr,
229 stream_id: u64,
230}
231
232impl<
233 ServiceTy: Clone
234 + Service<
235 ResponseStreamEvent<State>,
236 Future = ServiceFutureTy,
237 Response = (),
238 Error = CatBridgeError,
239 > + Send
240 + 'static,
241 ServiceFutureTy: Future<Output = Result<(), CatBridgeError>> + Send,
242 State: Clone + Send + Sync + 'static,
243> DisconnectAsyncDropServer<ServiceTy, ServiceFutureTy, State>
244{
245 #[must_use]
246 pub const fn new(
247 service: ServiceTy,
248 state: State,
249 source_address: SocketAddr,
250 stream_id: u64,
251 ) -> Self {
252 Self {
253 service,
254 state,
255 source_address,
256 stream_id,
257 }
258 }
259}
260
261impl<
262 ServiceTy: Clone
263 + Service<
264 ResponseStreamEvent<State>,
265 Future = ServiceFutureTy,
266 Response = (),
267 Error = CatBridgeError,
268 > + Send
269 + 'static,
270 ServiceFutureTy: Future<Output = Result<(), CatBridgeError>> + Send,
271 State: Clone + Send + Sync + 'static,
272> Drop for DisconnectAsyncDropServer<ServiceTy, ServiceFutureTy, State>
273{
274 fn drop(&mut self) {
275 let addr = self.source_address;
276 let mut svc = self.service.clone();
277 let state = self.state.clone();
278 let stream_id = self.stream_id;
279
280 if let Err(cause) = TaskBuilder::new().name("cat_dev::net::server::models::DisconnectAsyncDrop").spawn(async move {
281 if let Err(cause) = svc.call(
282 ResponseStreamEvent::new_disconnected_with_state(addr, Some(stream_id), state),
283 ).await {
284 warn!(
285 ?cause,
286 client.address = %addr,
287 server.stream_id = stream_id,
288 "On stream end task has failed during it's processing, and may need to be cleaned up manually.",
289 );
290 }
291 }) {
292 warn!(
293 ?cause,
294 client.address = %addr,
295 server.stream_id = stream_id,
296 "On Stream end task has failed to be spawned, and will not be completed!",
297 );
298 }
299 }
300}