cat_dev/net/server/
models.rs

1//! Models specifically for the TCP Server, as opposed to the TCP Client.
2
3use crate::{
4	errors::CatBridgeError,
5	net::{Extensions, models::Response},
6};
7use fnv::FnvHasher;
8use std::{
9	fmt::{Debug, Formatter, Result as FmtResult},
10	hash::{Hash, Hasher},
11	net::SocketAddr,
12};
13use tokio::{sync::mpsc::Sender, task::Builder as TaskBuilder};
14use tower::{Service, util::BoxCloneService};
15use tracing::warn;
16use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit};
17
18/// A channel to send messages to a connection from an "out of bound"
19/// location.
20///
21/// This way you could do things like health check, or PUB/SUB style
22/// architectures.
23#[derive(Clone, Debug, Valuable)]
24pub enum ResponseStreamMessage {
25	/// Request a disconnection without any response.
26	Disconnect,
27	/// Send an actual response back out.
28	Response(Response),
29}
30
31/// An event that is sent when a new connection is created, or destroyed.
32pub struct ResponseStreamEvent<State: Clone + Send + Sync + 'static = ()> {
33	/// A channel to send responses out-of-band responses on.
34	///
35	/// Allows for things like health-checking, and broadcasts of friend messages
36	/// out-of-band.
37	connection_channel: Option<Sender<ResponseStreamMessage>>,
38	/// The list of extensions to attach to this event.
39	ext: Extensions,
40	/// The source address of the initiator
41	source_address: SocketAddr,
42	/// A unique stream id.
43	///
44	/// On UDP where we don't have streams this is based off of the source-address.
45	stream_id: Option<u64>,
46	/// The app state on the current connection.
47	state: State,
48}
49
50impl ResponseStreamEvent<()> {
51	#[must_use]
52	pub const fn new(
53		connection_channel: Sender<ResponseStreamMessage>,
54		source_address: SocketAddr,
55		stream_id: Option<u64>,
56	) -> Self {
57		Self::new_with_state(connection_channel, source_address, stream_id, ())
58	}
59
60	#[must_use]
61	pub const fn new_disconnected(source_address: SocketAddr, stream_id: Option<u64>) -> Self {
62		Self::new_disconnected_with_state(source_address, stream_id, ())
63	}
64}
65
66impl<State: Clone + Send + Sync + 'static> ResponseStreamEvent<State> {
67	#[must_use]
68	pub const fn new_with_state(
69		connection_channel: Sender<ResponseStreamMessage>,
70		source_address: SocketAddr,
71		stream_id: Option<u64>,
72		state: State,
73	) -> Self {
74		Self {
75			connection_channel: Some(connection_channel),
76			ext: Extensions::new(),
77			source_address,
78			stream_id,
79			state,
80		}
81	}
82
83	#[must_use]
84	pub const fn new_disconnected_with_state(
85		source_address: SocketAddr,
86		stream_id: Option<u64>,
87		state: State,
88	) -> Self {
89		Self {
90			connection_channel: None,
91			ext: Extensions::new(),
92			source_address,
93			stream_id,
94			state,
95		}
96	}
97
98	/// A unique identifier for the "stream" or connection of a packet.
99	///
100	/// In UDP which doesn't have stream this uses the source address as
101	/// the core identifier.
102	#[must_use]
103	pub fn stream_id(&self) -> u64 {
104		if let Some(id) = self.stream_id {
105			id
106		} else {
107			let mut hasher = FnvHasher::default();
108			self.source_address.hash(&mut hasher);
109			hasher.finish()
110		}
111	}
112
113	/// Allows retrieving a channel to send out-of-band packets to a connection.
114	///
115	/// This type is clone-able, and can be used without needing to hold onto this event.
116	#[must_use]
117	pub const fn out_of_band_channel(&self) -> Option<&Sender<ResponseStreamMessage>> {
118		self.connection_channel.as_ref()
119	}
120
121	#[must_use]
122	pub const fn state(&self) -> &State {
123		&self.state
124	}
125	#[must_use]
126	pub fn state_mut(&mut self) -> &mut State {
127		&mut self.state
128	}
129
130	#[must_use]
131	pub const fn extensions(&self) -> &Extensions {
132		&self.ext
133	}
134	#[must_use]
135	pub fn extensions_mut(&mut self) -> &mut Extensions {
136		&mut self.ext
137	}
138
139	#[must_use]
140	pub const fn source(&self) -> &SocketAddr {
141		&self.source_address
142	}
143	#[must_use]
144	pub fn is_ipv4(&self) -> bool {
145		self.source_address.ip().is_ipv4()
146	}
147	#[must_use]
148	pub fn is_ipv6(&self) -> bool {
149		self.source_address.ip().is_ipv6()
150	}
151}
152
153impl<State: Clone + Send + Sync + 'static> Debug for ResponseStreamEvent<State> {
154	fn fmt(&self, fmt: &mut Formatter<'_>) -> FmtResult {
155		fmt.debug_struct("ResponseStreamEvent")
156			// Extensions can't be printed in debug by hyper, and in order to keep
157			// compatability ours don't.
158			.field("source_address", &self.source_address)
159			.field("stream_id", &self.stream_id)
160			.finish_non_exhaustive()
161	}
162}
163
164const CONNECTION_EVENT_FIELDS: &[NamedField<'static>] = &[
165	NamedField::new("source_address"),
166	NamedField::new("stream_id"),
167];
168
169impl<State: Clone + Send + Sync + 'static> Structable for ResponseStreamEvent<State> {
170	fn definition(&self) -> StructDef<'_> {
171		StructDef::new_static(
172			"ResponseStreamEvent",
173			Fields::Named(CONNECTION_EVENT_FIELDS),
174		)
175	}
176}
177
178impl<State: Clone + Send + Sync + 'static> Valuable for ResponseStreamEvent<State> {
179	fn as_value(&self) -> Value<'_> {
180		Value::Structable(self)
181	}
182
183	fn visit(&self, visitor: &mut dyn Visit) {
184		visitor.visit_named_fields(&NamedValues::new(
185			CONNECTION_EVENT_FIELDS,
186			&[
187				Valuable::as_value(&format!("{}", self.source_address)),
188				Valuable::as_value(&self.stream_id),
189			],
190		));
191	}
192}
193
194/// The underlying type we use for storing your on connection handler.
195pub type UnderlyingOnStreamBeginService<State> =
196	BoxCloneService<ResponseStreamEvent<State>, bool, CatBridgeError>;
197/// The underlying type we use for storing your on disconnect handler.
198pub type UnderlyingOnStreamEndService<State> =
199	BoxCloneService<ResponseStreamEvent<State>, (), CatBridgeError>;
200
201/// Extract any value from a Connection event.
202///
203/// Mirrors [`crate::net::models::FromRequest`].
204pub trait FromResponseStreamEvent<State: Clone + Send + Sync + 'static>: Sized {
205	fn from_stream_event(
206		event: &mut ResponseStreamEvent<State>,
207	) -> impl Future<Output = Result<Self, CatBridgeError>> + Send;
208}
209
210/// A type that holds onto a [`Service`], and can call it on drop.
211///
212/// This spawns a temporary task to run the async processing, and moves the
213/// event data within.
214pub(crate) struct DisconnectAsyncDropServer<
215	ServiceTy: Clone
216		+ Service<
217			ResponseStreamEvent<State>,
218			Future = ServiceFutureTy,
219			Response = (),
220			Error = CatBridgeError,
221		> + Send
222		+ 'static,
223	ServiceFutureTy: Future<Output = Result<(), CatBridgeError>> + Send,
224	State: Clone + Send + Sync + 'static,
225> {
226	service: ServiceTy,
227	state: State,
228	source_address: SocketAddr,
229	stream_id: u64,
230}
231
232impl<
233	ServiceTy: Clone
234		+ Service<
235			ResponseStreamEvent<State>,
236			Future = ServiceFutureTy,
237			Response = (),
238			Error = CatBridgeError,
239		> + Send
240		+ 'static,
241	ServiceFutureTy: Future<Output = Result<(), CatBridgeError>> + Send,
242	State: Clone + Send + Sync + 'static,
243> DisconnectAsyncDropServer<ServiceTy, ServiceFutureTy, State>
244{
245	#[must_use]
246	pub const fn new(
247		service: ServiceTy,
248		state: State,
249		source_address: SocketAddr,
250		stream_id: u64,
251	) -> Self {
252		Self {
253			service,
254			state,
255			source_address,
256			stream_id,
257		}
258	}
259}
260
261impl<
262	ServiceTy: Clone
263		+ Service<
264			ResponseStreamEvent<State>,
265			Future = ServiceFutureTy,
266			Response = (),
267			Error = CatBridgeError,
268		> + Send
269		+ 'static,
270	ServiceFutureTy: Future<Output = Result<(), CatBridgeError>> + Send,
271	State: Clone + Send + Sync + 'static,
272> Drop for DisconnectAsyncDropServer<ServiceTy, ServiceFutureTy, State>
273{
274	fn drop(&mut self) {
275		let addr = self.source_address;
276		let mut svc = self.service.clone();
277		let state = self.state.clone();
278		let stream_id = self.stream_id;
279
280		if let Err(cause) = TaskBuilder::new().name("cat_dev::net::server::models::DisconnectAsyncDrop").spawn(async move {
281			if let Err(cause) = svc.call(
282				ResponseStreamEvent::new_disconnected_with_state(addr, Some(stream_id), state),
283			).await {
284				warn!(
285					?cause,
286					client.address = %addr,
287					server.stream_id = stream_id,
288					"On stream end task has failed during it's processing, and may need to be cleaned up manually.",
289				);
290			}
291		}) {
292			warn!(
293				?cause,
294				client.address = %addr,
295				server.stream_id = stream_id,
296				"On Stream end task has failed to be spawned, and will not be completed!",
297			);
298		}
299	}
300}