Skip to main content

reinhardt_websockets/
connection.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4use tokio::sync::RwLock;
5use tokio::sync::mpsc;
6
7/// Ping/pong keepalive configuration for WebSocket connections.
8///
9/// Controls how frequently ping frames are sent and how long
10/// the server waits for a pong response before considering
11/// the connection dead.
12///
13/// # Examples
14///
15/// ```
16/// use reinhardt_websockets::connection::PingPongConfig;
17/// use std::time::Duration;
18///
19/// // Use defaults (30s ping interval, 10s pong timeout)
20/// let config = PingPongConfig::default();
21/// assert_eq!(config.ping_interval(), Duration::from_secs(30));
22/// assert_eq!(config.pong_timeout(), Duration::from_secs(10));
23///
24/// // Custom configuration
25/// let config = PingPongConfig::new(
26///     Duration::from_secs(15),
27///     Duration::from_secs(5),
28/// );
29/// assert_eq!(config.ping_interval(), Duration::from_secs(15));
30/// assert_eq!(config.pong_timeout(), Duration::from_secs(5));
31/// ```
32#[derive(Debug, Clone)]
33pub struct PingPongConfig {
34	/// Interval between ping frames sent to the client
35	ping_interval: Duration,
36	/// Maximum time to wait for a pong response before closing
37	pong_timeout: Duration,
38}
39
40impl Default for PingPongConfig {
41	fn default() -> Self {
42		Self {
43			ping_interval: Duration::from_secs(30),
44			pong_timeout: Duration::from_secs(10),
45		}
46	}
47}
48
49impl PingPongConfig {
50	/// Creates a new ping/pong configuration with the given intervals.
51	///
52	/// # Arguments
53	///
54	/// * `ping_interval` - How often to send ping frames
55	/// * `pong_timeout` - How long to wait for a pong response
56	///
57	/// # Examples
58	///
59	/// ```
60	/// use reinhardt_websockets::connection::PingPongConfig;
61	/// use std::time::Duration;
62	///
63	/// let config = PingPongConfig::new(
64	///     Duration::from_secs(20),
65	///     Duration::from_secs(8),
66	/// );
67	/// assert_eq!(config.ping_interval(), Duration::from_secs(20));
68	/// assert_eq!(config.pong_timeout(), Duration::from_secs(8));
69	/// ```
70	pub fn new(ping_interval: Duration, pong_timeout: Duration) -> Self {
71		Self {
72			ping_interval,
73			pong_timeout,
74		}
75	}
76
77	/// Returns the ping interval duration.
78	pub fn ping_interval(&self) -> Duration {
79		self.ping_interval
80	}
81
82	/// Returns the pong timeout duration.
83	pub fn pong_timeout(&self) -> Duration {
84		self.pong_timeout
85	}
86
87	/// Sets the ping interval duration.
88	///
89	/// # Examples
90	///
91	/// ```
92	/// use reinhardt_websockets::connection::PingPongConfig;
93	/// use std::time::Duration;
94	///
95	/// let config = PingPongConfig::default()
96	///     .with_ping_interval(Duration::from_secs(60));
97	/// assert_eq!(config.ping_interval(), Duration::from_secs(60));
98	/// ```
99	pub fn with_ping_interval(mut self, interval: Duration) -> Self {
100		self.ping_interval = interval;
101		self
102	}
103
104	/// Sets the pong timeout duration.
105	///
106	/// # Examples
107	///
108	/// ```
109	/// use reinhardt_websockets::connection::PingPongConfig;
110	/// use std::time::Duration;
111	///
112	/// let config = PingPongConfig::default()
113	///     .with_pong_timeout(Duration::from_secs(15));
114	/// assert_eq!(config.pong_timeout(), Duration::from_secs(15));
115	/// ```
116	pub fn with_pong_timeout(mut self, timeout: Duration) -> Self {
117		self.pong_timeout = timeout;
118		self
119	}
120}
121
122/// Connection timeout configuration
123///
124/// This struct defines the timeout settings for WebSocket connections
125/// to prevent resource exhaustion from idle connections.
126///
127/// # Fields
128///
129/// - `idle_timeout` - Maximum duration a connection can be idle before being closed (default: 5 minutes)
130/// - `handshake_timeout` - Maximum duration for the WebSocket handshake to complete (default: 10 seconds)
131/// - `cleanup_interval` - Interval for checking idle connections (default: 30 seconds)
132///
133/// # Examples
134///
135/// ```
136/// use reinhardt_websockets::connection::ConnectionConfig;
137/// use std::time::Duration;
138///
139/// let config = ConnectionConfig::new()
140///     .with_idle_timeout(Duration::from_secs(300))
141///     .with_handshake_timeout(Duration::from_secs(10))
142///     .with_cleanup_interval(Duration::from_secs(30));
143///
144/// assert_eq!(config.idle_timeout(), Duration::from_secs(300));
145/// assert_eq!(config.handshake_timeout(), Duration::from_secs(10));
146/// assert_eq!(config.cleanup_interval(), Duration::from_secs(30));
147/// ```
148#[derive(Debug, Clone)]
149pub struct ConnectionConfig {
150	idle_timeout: Duration,
151	handshake_timeout: Duration,
152	cleanup_interval: Duration,
153	/// Maximum number of concurrent connections allowed (None for unlimited)
154	max_connections: Option<usize>,
155	/// Ping/pong keepalive configuration
156	ping_config: PingPongConfig,
157}
158
159impl Default for ConnectionConfig {
160	fn default() -> Self {
161		Self {
162			idle_timeout: Duration::from_secs(300), // 5 minutes default
163			handshake_timeout: Duration::from_secs(10), // 10 seconds default
164			cleanup_interval: Duration::from_secs(30), // 30 seconds default
165			max_connections: None,                  // Unlimited by default
166			ping_config: PingPongConfig::default(),
167		}
168	}
169}
170
171impl ConnectionConfig {
172	/// Create a new connection configuration with default values
173	///
174	/// # Examples
175	///
176	/// ```
177	/// use reinhardt_websockets::connection::ConnectionConfig;
178	/// use std::time::Duration;
179	///
180	/// let config = ConnectionConfig::new();
181	/// assert_eq!(config.idle_timeout(), Duration::from_secs(300));
182	/// assert_eq!(config.handshake_timeout(), Duration::from_secs(10));
183	/// assert_eq!(config.cleanup_interval(), Duration::from_secs(30));
184	/// ```
185	pub fn new() -> Self {
186		Self::default()
187	}
188
189	/// Set the idle timeout duration
190	///
191	/// # Arguments
192	///
193	/// * `timeout` - Maximum duration a connection can be idle before being closed
194	///
195	/// # Examples
196	///
197	/// ```
198	/// use reinhardt_websockets::connection::ConnectionConfig;
199	/// use std::time::Duration;
200	///
201	/// let config = ConnectionConfig::new()
202	///     .with_idle_timeout(Duration::from_secs(60));
203	///
204	/// assert_eq!(config.idle_timeout(), Duration::from_secs(60));
205	/// ```
206	pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
207		self.idle_timeout = timeout;
208		self
209	}
210
211	/// Set the handshake timeout duration
212	///
213	/// # Arguments
214	///
215	/// * `timeout` - Maximum duration for the WebSocket handshake to complete
216	///
217	/// # Examples
218	///
219	/// ```
220	/// use reinhardt_websockets::connection::ConnectionConfig;
221	/// use std::time::Duration;
222	///
223	/// let config = ConnectionConfig::new()
224	///     .with_handshake_timeout(Duration::from_secs(5));
225	///
226	/// assert_eq!(config.handshake_timeout(), Duration::from_secs(5));
227	/// ```
228	pub fn with_handshake_timeout(mut self, timeout: Duration) -> Self {
229		self.handshake_timeout = timeout;
230		self
231	}
232
233	/// Set the cleanup interval for checking idle connections
234	///
235	/// # Arguments
236	///
237	/// * `interval` - How often to check for idle connections
238	///
239	/// # Examples
240	///
241	/// ```
242	/// use reinhardt_websockets::connection::ConnectionConfig;
243	/// use std::time::Duration;
244	///
245	/// let config = ConnectionConfig::new()
246	///     .with_cleanup_interval(Duration::from_secs(15));
247	///
248	/// assert_eq!(config.cleanup_interval(), Duration::from_secs(15));
249	/// ```
250	pub fn with_cleanup_interval(mut self, interval: Duration) -> Self {
251		self.cleanup_interval = interval;
252		self
253	}
254
255	/// Get the idle timeout duration
256	pub fn idle_timeout(&self) -> Duration {
257		self.idle_timeout
258	}
259
260	/// Get the handshake timeout duration
261	pub fn handshake_timeout(&self) -> Duration {
262		self.handshake_timeout
263	}
264
265	/// Get the cleanup interval duration
266	pub fn cleanup_interval(&self) -> Duration {
267		self.cleanup_interval
268	}
269
270	/// Set the maximum number of concurrent connections
271	///
272	/// # Arguments
273	///
274	/// * `max` - Maximum number of connections allowed. Use `None` for unlimited.
275	///
276	/// # Examples
277	///
278	/// ```
279	/// use reinhardt_websockets::connection::ConnectionConfig;
280	///
281	/// let config = ConnectionConfig::new()
282	///     .with_max_connections(Some(1000));
283	///
284	/// assert_eq!(config.max_connections(), Some(1000));
285	/// ```
286	pub fn with_max_connections(mut self, max: Option<usize>) -> Self {
287		self.max_connections = max;
288		self
289	}
290
291	/// Get the maximum number of concurrent connections
292	pub fn max_connections(&self) -> Option<usize> {
293		self.max_connections
294	}
295
296	/// Set the ping/pong keepalive configuration.
297	///
298	/// # Arguments
299	///
300	/// * `config` - The ping/pong configuration to use
301	///
302	/// # Examples
303	///
304	/// ```
305	/// use reinhardt_websockets::connection::{ConnectionConfig, PingPongConfig};
306	/// use std::time::Duration;
307	///
308	/// let ping_config = PingPongConfig::new(
309	///     Duration::from_secs(15),
310	///     Duration::from_secs(5),
311	/// );
312	/// let config = ConnectionConfig::new()
313	///     .with_ping_config(ping_config);
314	///
315	/// assert_eq!(config.ping_config().ping_interval(), Duration::from_secs(15));
316	/// assert_eq!(config.ping_config().pong_timeout(), Duration::from_secs(5));
317	/// ```
318	pub fn with_ping_config(mut self, config: PingPongConfig) -> Self {
319		self.ping_config = config;
320		self
321	}
322
323	/// Get the ping/pong keepalive configuration.
324	pub fn ping_config(&self) -> &PingPongConfig {
325		&self.ping_config
326	}
327
328	/// Create a configuration with no idle timeout (connections never time out)
329	///
330	/// # Examples
331	///
332	/// ```
333	/// use reinhardt_websockets::connection::ConnectionConfig;
334	///
335	/// let config = ConnectionConfig::no_timeout();
336	/// assert_eq!(config.idle_timeout(), std::time::Duration::MAX);
337	/// assert_eq!(config.handshake_timeout(), std::time::Duration::MAX);
338	/// ```
339	pub fn no_timeout() -> Self {
340		Self {
341			idle_timeout: Duration::MAX,
342			handshake_timeout: Duration::MAX,
343			cleanup_interval: Duration::from_secs(30),
344			max_connections: None,
345			ping_config: PingPongConfig::default(),
346		}
347	}
348
349	/// Create a strict configuration with short timeouts
350	///
351	/// - Idle timeout: 30 seconds
352	/// - Handshake timeout: 5 seconds
353	/// - Cleanup interval: 10 seconds
354	///
355	/// # Examples
356	///
357	/// ```
358	/// use reinhardt_websockets::connection::ConnectionConfig;
359	/// use std::time::Duration;
360	///
361	/// let config = ConnectionConfig::strict();
362	/// assert_eq!(config.idle_timeout(), Duration::from_secs(30));
363	/// assert_eq!(config.handshake_timeout(), Duration::from_secs(5));
364	/// assert_eq!(config.cleanup_interval(), Duration::from_secs(10));
365	/// ```
366	pub fn strict() -> Self {
367		Self {
368			idle_timeout: Duration::from_secs(30),
369			handshake_timeout: Duration::from_secs(5),
370			cleanup_interval: Duration::from_secs(10),
371			max_connections: None,
372			ping_config: PingPongConfig::new(Duration::from_secs(10), Duration::from_secs(5)),
373		}
374	}
375
376	/// Create a permissive configuration with long timeouts
377	///
378	/// - Idle timeout: 1 hour
379	/// - Handshake timeout: 30 seconds
380	/// - Cleanup interval: 60 seconds
381	///
382	/// # Examples
383	///
384	/// ```
385	/// use reinhardt_websockets::connection::ConnectionConfig;
386	/// use std::time::Duration;
387	///
388	/// let config = ConnectionConfig::permissive();
389	/// assert_eq!(config.idle_timeout(), Duration::from_secs(3600));
390	/// assert_eq!(config.handshake_timeout(), Duration::from_secs(30));
391	/// assert_eq!(config.cleanup_interval(), Duration::from_secs(60));
392	/// ```
393	pub fn permissive() -> Self {
394		Self {
395			idle_timeout: Duration::from_secs(3600),
396			handshake_timeout: Duration::from_secs(30),
397			cleanup_interval: Duration::from_secs(60),
398			max_connections: None,
399			ping_config: PingPongConfig::new(Duration::from_secs(60), Duration::from_secs(30)),
400		}
401	}
402}
403
404/// Errors that can occur during WebSocket operations.
405#[derive(Debug, thiserror::Error)]
406pub enum WebSocketError {
407	/// A connection-level error occurred.
408	#[error("Connection error")]
409	Connection(String),
410	/// Failed to send a message to the peer.
411	#[error("Send failed")]
412	Send(String),
413	/// Failed to receive a message from the peer.
414	#[error("Receive failed")]
415	Receive(String),
416	/// A WebSocket protocol violation was detected.
417	#[error("Protocol error")]
418	Protocol(String),
419	/// An internal server error occurred.
420	#[error("Internal error")]
421	Internal(String),
422	/// The connection timed out after the given duration of inactivity.
423	#[error("Connection timed out")]
424	Timeout(Duration),
425	/// Reconnection failed after the specified number of attempts.
426	#[error("Reconnection failed")]
427	ReconnectFailed(u32),
428	/// The binary payload was invalid or could not be decoded.
429	#[error("Invalid binary payload: {0}")]
430	BinaryPayload(String),
431	/// No pong response was received within the heartbeat timeout.
432	#[error("Heartbeat timeout: no pong received within {0:?}")]
433	HeartbeatTimeout(Duration),
434	/// The consumer could not keep up with the message rate.
435	#[error("Slow consumer: send timed out after {0:?}")]
436	SlowConsumer(Duration),
437}
438
439impl WebSocketError {
440	/// Returns a sanitized error message safe for client-facing communication.
441	///
442	/// Internal details (buffer sizes, queue depths, connection state, type names)
443	/// are stripped to prevent information leakage.
444	pub fn client_message(&self) -> &'static str {
445		match self {
446			Self::Connection(_) => "Connection error",
447			Self::Send(_) => "Failed to send message",
448			Self::Receive(_) => "Failed to receive message",
449			Self::Protocol(_) => "Protocol error",
450			Self::Internal(_) => "Internal server error",
451			Self::Timeout(_) => "Connection timed out",
452			Self::ReconnectFailed(_) => "Reconnection failed",
453			Self::BinaryPayload(_) => "Invalid message format",
454			Self::HeartbeatTimeout(_) => "Connection timed out",
455			Self::SlowConsumer(_) => "Server overloaded",
456		}
457	}
458
459	/// Returns the internal detail message for logging purposes.
460	///
461	/// This MUST NOT be sent to clients as it may contain sensitive
462	/// internal state information.
463	pub fn internal_detail(&self) -> String {
464		match self {
465			Self::Connection(msg) => format!("Connection error: {}", msg),
466			Self::Send(msg) => format!("Send error: {}", msg),
467			Self::Receive(msg) => format!("Receive error: {}", msg),
468			Self::Protocol(msg) => format!("Protocol error: {}", msg),
469			Self::Internal(msg) => format!("Internal error: {}", msg),
470			Self::Timeout(d) => format!("Connection timeout: idle for {:?}", d),
471			Self::ReconnectFailed(n) => format!("Reconnection failed after {} attempts", n),
472			Self::BinaryPayload(msg) => format!("Invalid binary payload: {}", msg),
473			Self::HeartbeatTimeout(d) => format!("Heartbeat timeout: no pong within {:?}", d),
474			Self::SlowConsumer(d) => format!("Slow consumer: send timed out after {:?}", d),
475		}
476	}
477}
478
479/// A specialized `Result` type for WebSocket operations.
480pub type WebSocketResult<T> = Result<T, WebSocketError>;
481
482/// WebSocket message types
483#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
484#[serde(tag = "type")]
485pub enum Message {
486	/// A UTF-8 text message.
487	Text {
488		/// The text content of the message.
489		data: String,
490	},
491	/// A binary message.
492	Binary {
493		/// The raw bytes of the message.
494		data: Vec<u8>,
495	},
496	/// A ping control frame.
497	Ping,
498	/// A pong control frame (response to ping).
499	Pong,
500	/// A close control frame with status code and reason.
501	Close {
502		/// The WebSocket close status code.
503		code: u16,
504		/// A human-readable reason for closing.
505		reason: String,
506	},
507}
508
509impl Message {
510	/// Creates a new text message.
511	///
512	/// # Examples
513	///
514	/// ```
515	/// use reinhardt_websockets::Message;
516	///
517	/// let msg = Message::text("Hello, World!".to_string());
518	/// match msg {
519	///     Message::Text { data } => assert_eq!(data, "Hello, World!"),
520	///     _ => panic!("Expected text message"),
521	/// }
522	/// ```
523	pub fn text(data: String) -> Self {
524		Self::Text { data }
525	}
526	/// Creates a new binary message.
527	///
528	/// # Examples
529	///
530	/// ```
531	/// use reinhardt_websockets::Message;
532	///
533	/// let data = vec![0x48, 0x65, 0x6c, 0x6c, 0x6f]; // "Hello" in bytes
534	/// let msg = Message::binary(data.clone());
535	/// match msg {
536	///     Message::Binary { data: d } => assert_eq!(d, data),
537	///     _ => panic!("Expected binary message"),
538	/// }
539	/// ```
540	pub fn binary(data: Vec<u8>) -> Self {
541		Self::Binary { data }
542	}
543	/// Creates a text message containing JSON-serialized data.
544	///
545	/// # Examples
546	///
547	/// ```
548	/// use reinhardt_websockets::Message;
549	/// use serde::Serialize;
550	///
551	/// #[derive(Serialize)]
552	/// struct User {
553	///     name: String,
554	///     age: u32,
555	/// }
556	///
557	/// let user = User {
558	///     name: "Alice".to_string(),
559	///     age: 30,
560	/// };
561	///
562	/// let msg = Message::json(&user).unwrap();
563	/// match msg {
564	///     Message::Text { data } => {
565	///         assert!(data.contains("Alice"));
566	///         assert!(data.contains("30"));
567	///     },
568	///     _ => panic!("Expected text message"),
569	/// }
570	/// ```
571	pub fn json<T: serde::Serialize>(data: &T) -> WebSocketResult<Self> {
572		let json =
573			serde_json::to_string(data).map_err(|e| WebSocketError::Protocol(e.to_string()))?;
574		Ok(Self::text(json))
575	}
576	/// Parses the message content as JSON into the target type.
577	///
578	/// # Examples
579	///
580	/// ```
581	/// use reinhardt_websockets::Message;
582	/// use serde::Deserialize;
583	///
584	/// #[derive(Deserialize, Debug, PartialEq)]
585	/// struct User {
586	///     name: String,
587	///     age: u32,
588	/// }
589	///
590	/// let msg = Message::text(r#"{"name":"Bob","age":25}"#.to_string());
591	/// let user: User = msg.parse_json().unwrap();
592	/// assert_eq!(user.name, "Bob");
593	/// assert_eq!(user.age, 25);
594	/// ```
595	pub fn parse_json<T: serde::de::DeserializeOwned>(&self) -> WebSocketResult<T> {
596		match self {
597			Message::Text { data } => {
598				serde_json::from_str(data).map_err(|e| WebSocketError::Protocol(e.to_string()))
599			}
600			_ => Err(WebSocketError::Protocol("Not a text message".to_string())),
601		}
602	}
603}
604
605/// WebSocket connection with activity tracking and timeout support
606pub struct WebSocketConnection {
607	id: String,
608	tx: mpsc::UnboundedSender<Message>,
609	closed: Arc<RwLock<bool>>,
610	/// Subprotocol (negotiated protocol during WebSocket handshake)
611	subprotocol: Option<String>,
612	/// Timestamp of last activity on this connection
613	last_activity: Arc<RwLock<Instant>>,
614	/// Connection timeout configuration
615	config: ConnectionConfig,
616}
617
618impl WebSocketConnection {
619	/// Creates a new WebSocket connection with the given ID and sender.
620	///
621	/// Uses default [`ConnectionConfig`] for timeout settings.
622	///
623	/// # Examples
624	///
625	/// ```
626	/// use reinhardt_websockets::{WebSocketConnection, Message};
627	/// use tokio::sync::mpsc;
628	///
629	/// let (tx, _rx) = mpsc::unbounded_channel();
630	/// let conn = WebSocketConnection::new("connection_1".to_string(), tx);
631	/// assert_eq!(conn.id(), "connection_1");
632	/// ```
633	pub fn new(id: String, tx: mpsc::UnboundedSender<Message>) -> Self {
634		Self {
635			id,
636			tx,
637			closed: Arc::new(RwLock::new(false)),
638			subprotocol: None,
639			last_activity: Arc::new(RwLock::new(Instant::now())),
640			config: ConnectionConfig::default(),
641		}
642	}
643
644	/// Creates a new WebSocket connection with the given ID, sender, and configuration.
645	///
646	/// # Examples
647	///
648	/// ```
649	/// use reinhardt_websockets::{WebSocketConnection, Message};
650	/// use reinhardt_websockets::connection::ConnectionConfig;
651	/// use tokio::sync::mpsc;
652	/// use std::time::Duration;
653	///
654	/// let (tx, _rx) = mpsc::unbounded_channel();
655	/// let config = ConnectionConfig::new()
656	///     .with_idle_timeout(Duration::from_secs(60));
657	/// let conn = WebSocketConnection::with_config("conn_1".to_string(), tx, config);
658	/// assert_eq!(conn.id(), "conn_1");
659	/// assert_eq!(conn.config().idle_timeout(), Duration::from_secs(60));
660	/// ```
661	pub fn with_config(
662		id: String,
663		tx: mpsc::UnboundedSender<Message>,
664		config: ConnectionConfig,
665	) -> Self {
666		Self {
667			id,
668			tx,
669			closed: Arc::new(RwLock::new(false)),
670			subprotocol: None,
671			last_activity: Arc::new(RwLock::new(Instant::now())),
672			config,
673		}
674	}
675
676	/// Creates a new WebSocket connection with the given ID, sender, and subprotocol.
677	///
678	/// # Examples
679	///
680	/// ```
681	/// use reinhardt_websockets::{WebSocketConnection, Message};
682	/// use tokio::sync::mpsc;
683	///
684	/// let (tx, _rx) = mpsc::unbounded_channel();
685	/// let conn = WebSocketConnection::with_subprotocol(
686	///     "connection_1".to_string(),
687	///     tx,
688	///     Some("chat".to_string())
689	/// );
690	/// assert_eq!(conn.id(), "connection_1");
691	/// assert_eq!(conn.subprotocol(), Some("chat"));
692	/// ```
693	pub fn with_subprotocol(
694		id: String,
695		tx: mpsc::UnboundedSender<Message>,
696		subprotocol: Option<String>,
697	) -> Self {
698		Self {
699			id,
700			tx,
701			closed: Arc::new(RwLock::new(false)),
702			subprotocol,
703			last_activity: Arc::new(RwLock::new(Instant::now())),
704			config: ConnectionConfig::default(),
705		}
706	}
707
708	/// Gets the negotiated subprotocol, if any.
709	///
710	/// # Examples
711	///
712	/// ```
713	/// use reinhardt_websockets::{WebSocketConnection, Message};
714	/// use tokio::sync::mpsc;
715	///
716	/// let (tx, _rx) = mpsc::unbounded_channel();
717	/// let conn = WebSocketConnection::with_subprotocol(
718	///     "test".to_string(),
719	///     tx,
720	///     Some("chat".to_string())
721	/// );
722	/// assert_eq!(conn.subprotocol(), Some("chat"));
723	/// ```
724	pub fn subprotocol(&self) -> Option<&str> {
725		self.subprotocol.as_deref()
726	}
727
728	/// Gets the connection ID.
729	///
730	/// # Examples
731	///
732	/// ```
733	/// use reinhardt_websockets::{WebSocketConnection, Message};
734	/// use tokio::sync::mpsc;
735	///
736	/// let (tx, _rx) = mpsc::unbounded_channel();
737	/// let conn = WebSocketConnection::new("test_id".to_string(), tx);
738	/// assert_eq!(conn.id(), "test_id");
739	/// ```
740	pub fn id(&self) -> &str {
741		&self.id
742	}
743
744	/// Gets the connection timeout configuration.
745	pub fn config(&self) -> &ConnectionConfig {
746		&self.config
747	}
748
749	/// Records activity on the connection, resetting the idle timer.
750	///
751	/// This is called automatically when sending messages, but can also be called
752	/// manually to indicate that the connection is still active (e.g., when
753	/// receiving messages from the client).
754	///
755	/// # Examples
756	///
757	/// ```
758	/// use reinhardt_websockets::WebSocketConnection;
759	/// use tokio::sync::mpsc;
760	///
761	/// # tokio_test::block_on(async {
762	/// let (tx, _rx) = mpsc::unbounded_channel();
763	/// let conn = WebSocketConnection::new("test".to_string(), tx);
764	///
765	/// conn.record_activity().await;
766	/// assert!(!conn.is_idle().await);
767	/// # });
768	/// ```
769	pub async fn record_activity(&self) {
770		*self.last_activity.write().await = Instant::now();
771	}
772
773	/// Returns the duration since the last activity on this connection.
774	///
775	/// # Examples
776	///
777	/// ```
778	/// use reinhardt_websockets::WebSocketConnection;
779	/// use tokio::sync::mpsc;
780	/// use std::time::Duration;
781	///
782	/// # tokio_test::block_on(async {
783	/// let (tx, _rx) = mpsc::unbounded_channel();
784	/// let conn = WebSocketConnection::new("test".to_string(), tx);
785	///
786	/// let idle = conn.idle_duration().await;
787	/// assert!(idle < Duration::from_secs(1));
788	/// # });
789	/// ```
790	pub async fn idle_duration(&self) -> Duration {
791		self.last_activity.read().await.elapsed()
792	}
793
794	/// Checks whether this connection has exceeded its idle timeout.
795	///
796	/// # Examples
797	///
798	/// ```
799	/// use reinhardt_websockets::WebSocketConnection;
800	/// use tokio::sync::mpsc;
801	///
802	/// # tokio_test::block_on(async {
803	/// let (tx, _rx) = mpsc::unbounded_channel();
804	/// let conn = WebSocketConnection::new("test".to_string(), tx);
805	///
806	/// // A freshly created connection is not idle
807	/// assert!(!conn.is_idle().await);
808	/// # });
809	/// ```
810	pub async fn is_idle(&self) -> bool {
811		self.idle_duration().await > self.config.idle_timeout
812	}
813
814	/// Sends a message through the WebSocket connection.
815	///
816	/// Records activity on the connection when a message is sent successfully.
817	///
818	/// # Examples
819	///
820	/// ```
821	/// use reinhardt_websockets::{WebSocketConnection, Message};
822	/// use tokio::sync::mpsc;
823	///
824	/// # tokio_test::block_on(async {
825	/// let (tx, mut rx) = mpsc::unbounded_channel();
826	/// let conn = WebSocketConnection::new("test".to_string(), tx);
827	///
828	/// let message = Message::text("Hello".to_string());
829	/// conn.send(message).await.unwrap();
830	///
831	/// let received = rx.recv().await.unwrap();
832	/// assert!(matches!(received, Message::Text { .. }));
833	/// # });
834	/// ```
835	pub async fn send(&self, message: Message) -> WebSocketResult<()> {
836		if *self.closed.read().await {
837			return Err(WebSocketError::Send("Connection closed".to_string()));
838		}
839
840		let result = self
841			.tx
842			.send(message)
843			.map_err(|e| WebSocketError::Send(e.to_string()));
844
845		if result.is_ok() {
846			self.record_activity().await;
847		}
848
849		result
850	}
851	/// Sends a text message through the WebSocket connection.
852	///
853	/// # Examples
854	///
855	/// ```
856	/// use reinhardt_websockets::{WebSocketConnection, Message};
857	/// use tokio::sync::mpsc;
858	///
859	/// # tokio_test::block_on(async {
860	/// let (tx, mut rx) = mpsc::unbounded_channel();
861	/// let conn = WebSocketConnection::new("test".to_string(), tx);
862	///
863	/// conn.send_text("Hello World".to_string()).await.unwrap();
864	///
865	/// let received = rx.recv().await.unwrap();
866	/// match received {
867	///     Message::Text { data } => assert_eq!(data, "Hello World"),
868	///     _ => panic!("Expected text message"),
869	/// }
870	/// # });
871	/// ```
872	pub async fn send_text(&self, text: String) -> WebSocketResult<()> {
873		self.send(Message::text(text)).await
874	}
875	/// Sends a binary message through the WebSocket connection.
876	///
877	/// # Examples
878	///
879	/// ```
880	/// use reinhardt_websockets::{WebSocketConnection, Message};
881	/// use tokio::sync::mpsc;
882	///
883	/// # tokio_test::block_on(async {
884	/// let (tx, mut rx) = mpsc::unbounded_channel();
885	/// let conn = WebSocketConnection::new("test".to_string(), tx);
886	///
887	/// let binary_data = vec![0x48, 0x65, 0x6c, 0x6c, 0x6f]; // "Hello"
888	/// conn.send_binary(binary_data.clone()).await.unwrap();
889	///
890	/// let received = rx.recv().await.unwrap();
891	/// match received {
892	///     Message::Binary { data } => assert_eq!(data, binary_data),
893	///     _ => panic!("Expected binary message"),
894	/// }
895	/// # });
896	/// ```
897	pub async fn send_binary(&self, data: Vec<u8>) -> WebSocketResult<()> {
898		self.send(Message::binary(data)).await
899	}
900	/// Sends a JSON message through the WebSocket connection.
901	///
902	/// # Examples
903	///
904	/// ```
905	/// use reinhardt_websockets::{WebSocketConnection, Message};
906	/// use tokio::sync::mpsc;
907	/// use serde::Serialize;
908	///
909	/// #[derive(Serialize)]
910	/// struct User {
911	///     name: String,
912	///     age: u32,
913	/// }
914	///
915	/// # tokio_test::block_on(async {
916	/// let (tx, mut rx) = mpsc::unbounded_channel();
917	/// let conn = WebSocketConnection::new("test".to_string(), tx);
918	///
919	/// let user = User { name: "Alice".to_string(), age: 30 };
920	/// conn.send_json(&user).await.unwrap();
921	///
922	/// let received = rx.recv().await.unwrap();
923	/// match received {
924	///     Message::Text { data } => assert!(data.contains("Alice")),
925	///     _ => panic!("Expected text message"),
926	/// }
927	/// # });
928	/// ```
929	pub async fn send_json<T: serde::Serialize>(&self, data: &T) -> WebSocketResult<()> {
930		let message = Message::json(data)?;
931		self.send(message).await
932	}
933	/// Closes the WebSocket connection.
934	///
935	/// The connection is always marked as closed regardless of whether the
936	/// close frame could be sent. This ensures resource cleanup even when
937	/// the underlying channel is already broken.
938	///
939	/// # Examples
940	///
941	/// ```
942	/// use reinhardt_websockets::{WebSocketConnection, Message};
943	/// use tokio::sync::mpsc;
944	///
945	/// # tokio_test::block_on(async {
946	/// let (tx, mut rx) = mpsc::unbounded_channel();
947	/// let conn = WebSocketConnection::new("test".to_string(), tx);
948	///
949	/// conn.close().await.unwrap();
950	/// assert!(conn.is_closed().await);
951	/// # });
952	/// ```
953	pub async fn close(&self) -> WebSocketResult<()> {
954		// Mark as closed first to prevent new sends
955		*self.closed.write().await = true;
956
957		// Best-effort send of close frame; connection is closed regardless
958		self.tx
959			.send(Message::Close {
960				code: 1000,
961				reason: "Normal closure".to_string(),
962			})
963			.map_err(|e| WebSocketError::Send(e.to_string()))
964	}
965	/// Closes the connection with a custom close code and reason.
966	///
967	/// The connection is always marked as closed regardless of whether the
968	/// close frame could be sent. This ensures resource cleanup even when
969	/// the underlying channel is already broken.
970	///
971	/// # Examples
972	///
973	/// ```
974	/// use reinhardt_websockets::{WebSocketConnection, Message};
975	/// use tokio::sync::mpsc;
976	///
977	/// # tokio_test::block_on(async {
978	/// let (tx, mut rx) = mpsc::unbounded_channel();
979	/// let conn = WebSocketConnection::new("test".to_string(), tx);
980	///
981	/// conn.close_with_reason(1001, "Idle timeout".to_string()).await.unwrap();
982	/// assert!(conn.is_closed().await);
983	///
984	/// let msg = rx.recv().await.unwrap();
985	/// match msg {
986	///     Message::Close { code, reason } => {
987	///         assert_eq!(code, 1001);
988	///         assert_eq!(reason, "Idle timeout");
989	///     },
990	///     _ => panic!("Expected close message"),
991	/// }
992	/// # });
993	/// ```
994	pub async fn close_with_reason(&self, code: u16, reason: String) -> WebSocketResult<()> {
995		// Mark as closed first to prevent new sends
996		*self.closed.write().await = true;
997
998		// Best-effort send of close frame; connection is closed regardless
999		self.tx
1000			.send(Message::Close { code, reason })
1001			.map_err(|e| WebSocketError::Send(e.to_string()))
1002	}
1003
1004	/// Forces the connection closed without sending a close frame.
1005	///
1006	/// Use this for abnormal close paths where the underlying transport is
1007	/// already broken and sending a close frame would fail.
1008	///
1009	/// # Examples
1010	///
1011	/// ```
1012	/// use reinhardt_websockets::WebSocketConnection;
1013	/// use tokio::sync::mpsc;
1014	///
1015	/// # tokio_test::block_on(async {
1016	/// let (tx, _rx) = mpsc::unbounded_channel();
1017	/// let conn = WebSocketConnection::new("test".to_string(), tx);
1018	///
1019	/// conn.force_close().await;
1020	/// assert!(conn.is_closed().await);
1021	/// # });
1022	/// ```
1023	pub async fn force_close(&self) {
1024		*self.closed.write().await = true;
1025	}
1026
1027	/// Checks if the WebSocket connection is closed.
1028	///
1029	/// # Examples
1030	///
1031	/// ```
1032	/// use reinhardt_websockets::{WebSocketConnection, Message};
1033	/// use tokio::sync::mpsc;
1034	///
1035	/// # tokio_test::block_on(async {
1036	/// let (tx, _rx) = mpsc::unbounded_channel();
1037	/// let conn = WebSocketConnection::new("test".to_string(), tx);
1038	///
1039	/// assert!(!conn.is_closed().await);
1040	/// # });
1041	/// ```
1042	pub async fn is_closed(&self) -> bool {
1043		*self.closed.read().await
1044	}
1045}
1046
1047/// Monitors WebSocket connections for idle timeouts and cleans them up.
1048///
1049/// The monitor periodically checks all registered connections and closes
1050/// those that have exceeded their idle timeout. This prevents resource
1051/// exhaustion from idle connection holding attacks.
1052///
1053/// # Examples
1054///
1055/// ```
1056/// use reinhardt_websockets::connection::{ConnectionConfig, ConnectionTimeoutMonitor};
1057/// use reinhardt_websockets::WebSocketConnection;
1058/// use tokio::sync::mpsc;
1059/// use std::sync::Arc;
1060/// use std::time::Duration;
1061///
1062/// # tokio_test::block_on(async {
1063/// let config = ConnectionConfig::new()
1064///     .with_idle_timeout(Duration::from_secs(60))
1065///     .with_cleanup_interval(Duration::from_secs(10));
1066///
1067/// let monitor = ConnectionTimeoutMonitor::new(config);
1068///
1069/// let (tx, _rx) = mpsc::unbounded_channel();
1070/// let conn = Arc::new(WebSocketConnection::new("conn_1".to_string(), tx));
1071/// monitor.register(conn).await.unwrap();
1072///
1073/// assert_eq!(monitor.connection_count().await, 1);
1074/// # });
1075/// ```
1076pub struct ConnectionTimeoutMonitor {
1077	connections: Arc<RwLock<HashMap<String, Arc<WebSocketConnection>>>>,
1078	config: ConnectionConfig,
1079}
1080
1081impl ConnectionTimeoutMonitor {
1082	/// Creates a new connection timeout monitor with the given configuration.
1083	pub fn new(config: ConnectionConfig) -> Self {
1084		Self {
1085			connections: Arc::new(RwLock::new(HashMap::new())),
1086			config,
1087		}
1088	}
1089
1090	/// Registers a connection for timeout monitoring.
1091	///
1092	/// Returns `Ok(())` if the connection was registered, or `Err` if the
1093	/// maximum connection limit has been reached.
1094	pub async fn register(
1095		&self,
1096		connection: Arc<WebSocketConnection>,
1097	) -> Result<(), WebSocketError> {
1098		let mut connections = self.connections.write().await;
1099
1100		if let Some(max) = self.config.max_connections
1101			&& connections.len() >= max
1102		{
1103			return Err(WebSocketError::Connection(format!(
1104				"maximum connection limit reached ({})",
1105				max
1106			)));
1107		}
1108
1109		connections.insert(connection.id().to_string(), connection);
1110		Ok(())
1111	}
1112
1113	/// Unregisters a connection from timeout monitoring.
1114	pub async fn unregister(&self, connection_id: &str) {
1115		self.connections.write().await.remove(connection_id);
1116	}
1117
1118	/// Returns the number of currently monitored connections.
1119	pub async fn connection_count(&self) -> usize {
1120		self.connections.read().await.len()
1121	}
1122
1123	/// Checks all connections and closes those that have exceeded their idle timeout.
1124	///
1125	/// Returns the IDs of connections that were closed due to timeout.
1126	pub async fn check_idle_connections(&self) -> Vec<String> {
1127		let connections = self.connections.read().await;
1128		let mut timed_out = Vec::new();
1129
1130		for (id, conn) in connections.iter() {
1131			if conn.is_closed().await {
1132				timed_out.push(id.clone());
1133				continue;
1134			}
1135
1136			let idle_duration = conn.idle_duration().await;
1137			if idle_duration > self.config.idle_timeout {
1138				let reason = format!(
1139					"Idle timeout: connection idle for {}s (limit: {}s)",
1140					idle_duration.as_secs(),
1141					self.config.idle_timeout.as_secs()
1142				);
1143				// Close with 1001 (Going Away) as per RFC 6455
1144				let _ = conn.close_with_reason(1001, reason).await;
1145				timed_out.push(id.clone());
1146			}
1147		}
1148
1149		drop(connections);
1150
1151		// Remove timed-out connections
1152		if !timed_out.is_empty() {
1153			let mut connections = self.connections.write().await;
1154			for id in &timed_out {
1155				connections.remove(id);
1156			}
1157		}
1158
1159		timed_out
1160	}
1161
1162	/// Gracefully shuts down all monitored connections.
1163	///
1164	/// Sends a Close frame (code 1001, "Going Away") to each active connection
1165	/// and removes it from monitoring. Already-closed connections are silently
1166	/// removed.
1167	///
1168	/// Returns the IDs of all connections that were shut down.
1169	pub async fn shutdown_all(&self) -> Vec<String> {
1170		let mut connections = self.connections.write().await;
1171		let mut shut_down = Vec::with_capacity(connections.len());
1172
1173		for (id, conn) in connections.drain() {
1174			if !conn.is_closed().await {
1175				let _ = conn
1176					.close_with_reason(1001, "Server shutting down".to_string())
1177					.await;
1178			}
1179			shut_down.push(id);
1180		}
1181
1182		shut_down
1183	}
1184
1185	/// Starts the background monitoring task.
1186	///
1187	/// Returns a [`tokio::task::JoinHandle`] that can be used to abort the monitor.
1188	/// The monitor runs until the handle is aborted or the process exits.
1189	///
1190	/// # Examples
1191	///
1192	/// ```
1193	/// use reinhardt_websockets::connection::{ConnectionConfig, ConnectionTimeoutMonitor};
1194	/// use std::time::Duration;
1195	///
1196	/// # tokio_test::block_on(async {
1197	/// let config = ConnectionConfig::new()
1198	///     .with_cleanup_interval(Duration::from_millis(100));
1199	/// let monitor = std::sync::Arc::new(ConnectionTimeoutMonitor::new(config));
1200	///
1201	/// let handle = monitor.start();
1202	///
1203	/// // Monitor is running in background...
1204	/// tokio::time::sleep(Duration::from_millis(50)).await;
1205	///
1206	/// // Stop the monitor
1207	/// handle.abort();
1208	/// # });
1209	/// ```
1210	pub fn start(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
1211		let monitor = Arc::clone(self);
1212		tokio::spawn(async move {
1213			let mut interval = tokio::time::interval(monitor.config.cleanup_interval);
1214			loop {
1215				interval.tick().await;
1216				monitor.check_idle_connections().await;
1217			}
1218		})
1219	}
1220}
1221
1222/// Configuration for heartbeat (ping/pong) monitoring.
1223///
1224/// Defines how often pings are sent and how long to wait for a pong
1225/// response before considering the connection dead.
1226///
1227/// # Examples
1228///
1229/// ```
1230/// use reinhardt_websockets::connection::HeartbeatConfig;
1231/// use std::time::Duration;
1232///
1233/// let config = HeartbeatConfig::new(
1234///     Duration::from_secs(30),
1235///     Duration::from_secs(10),
1236/// );
1237///
1238/// assert_eq!(config.ping_interval(), Duration::from_secs(30));
1239/// assert_eq!(config.pong_timeout(), Duration::from_secs(10));
1240/// ```
1241#[derive(Debug, Clone)]
1242pub struct HeartbeatConfig {
1243	/// Interval between outgoing pings
1244	ping_interval: Duration,
1245	/// Maximum time to wait for a pong response before closing
1246	pong_timeout: Duration,
1247}
1248
1249impl HeartbeatConfig {
1250	/// Creates a new heartbeat configuration.
1251	pub fn new(ping_interval: Duration, pong_timeout: Duration) -> Self {
1252		Self {
1253			ping_interval,
1254			pong_timeout,
1255		}
1256	}
1257
1258	/// Returns the ping interval.
1259	pub fn ping_interval(&self) -> Duration {
1260		self.ping_interval
1261	}
1262
1263	/// Returns the pong timeout.
1264	pub fn pong_timeout(&self) -> Duration {
1265		self.pong_timeout
1266	}
1267}
1268
1269impl Default for HeartbeatConfig {
1270	fn default() -> Self {
1271		Self {
1272			ping_interval: Duration::from_secs(30),
1273			pong_timeout: Duration::from_secs(10),
1274		}
1275	}
1276}
1277
1278/// Monitors a WebSocket connection's heartbeat via ping/pong.
1279///
1280/// Sends periodic pings and tracks when the last pong was received.
1281/// When no pong arrives within the configured timeout, the connection
1282/// is force-closed and the monitor signals a heartbeat failure.
1283///
1284/// # Examples
1285///
1286/// ```
1287/// use reinhardt_websockets::connection::{HeartbeatConfig, HeartbeatMonitor};
1288/// use reinhardt_websockets::WebSocketConnection;
1289/// use tokio::sync::mpsc;
1290/// use std::sync::Arc;
1291/// use std::time::Duration;
1292///
1293/// # tokio_test::block_on(async {
1294/// let (tx, _rx) = mpsc::unbounded_channel();
1295/// let conn = Arc::new(WebSocketConnection::new("hb_test".to_string(), tx));
1296/// let config = HeartbeatConfig::default();
1297///
1298/// let monitor = HeartbeatMonitor::new(conn, config);
1299/// assert!(!monitor.is_timed_out().await);
1300/// # });
1301/// ```
1302pub struct HeartbeatMonitor {
1303	connection: Arc<WebSocketConnection>,
1304	config: HeartbeatConfig,
1305	last_pong: Arc<RwLock<Instant>>,
1306	timed_out: Arc<RwLock<bool>>,
1307	pong_notify: Arc<tokio::sync::Notify>,
1308}
1309
1310impl HeartbeatMonitor {
1311	/// Creates a new heartbeat monitor for the given connection.
1312	pub fn new(connection: Arc<WebSocketConnection>, config: HeartbeatConfig) -> Self {
1313		Self {
1314			connection,
1315			config,
1316			last_pong: Arc::new(RwLock::new(Instant::now())),
1317			timed_out: Arc::new(RwLock::new(false)),
1318			pong_notify: Arc::new(tokio::sync::Notify::new()),
1319		}
1320	}
1321
1322	/// Records a pong response, resetting the timeout tracker.
1323	///
1324	/// This also wakes up the heartbeat monitor's sleep so it can
1325	/// proceed immediately instead of waiting for the full pong timeout.
1326	pub async fn record_pong(&self) {
1327		*self.last_pong.write().await = Instant::now();
1328		self.pong_notify.notify_one();
1329	}
1330
1331	/// Returns the duration since the last pong was received.
1332	pub async fn time_since_last_pong(&self) -> Duration {
1333		self.last_pong.read().await.elapsed()
1334	}
1335
1336	/// Returns whether the heartbeat has timed out.
1337	pub async fn is_timed_out(&self) -> bool {
1338		*self.timed_out.read().await
1339	}
1340
1341	/// Checks whether the pong timeout has been exceeded.
1342	///
1343	/// If the timeout is exceeded, the connection is force-closed and
1344	/// the method returns `true`.
1345	pub async fn check_heartbeat(&self) -> bool {
1346		let since_pong = self.time_since_last_pong().await;
1347
1348		if since_pong > self.config.pong_timeout {
1349			self.connection.force_close().await;
1350			*self.timed_out.write().await = true;
1351			return true;
1352		}
1353
1354		false
1355	}
1356
1357	/// Sends a ping message through the connection.
1358	///
1359	/// Returns `Ok(())` if the ping was sent, or an error if the
1360	/// connection is already closed.
1361	pub async fn send_ping(&self) -> WebSocketResult<()> {
1362		self.connection.send(Message::Ping).await
1363	}
1364
1365	/// Returns a reference to the heartbeat configuration.
1366	pub fn config(&self) -> &HeartbeatConfig {
1367		&self.config
1368	}
1369
1370	/// Returns a reference to the monitored connection.
1371	pub fn connection(&self) -> &Arc<WebSocketConnection> {
1372		&self.connection
1373	}
1374
1375	/// Starts a background task that periodically sends pings and checks
1376	/// for pong timeouts.
1377	///
1378	/// Returns a [`tokio::task::JoinHandle`] that can be aborted to stop
1379	/// the monitor. The task ends automatically when a heartbeat timeout
1380	/// occurs.
1381	pub fn start(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
1382		let monitor = Arc::clone(self);
1383		tokio::spawn(async move {
1384			let mut interval = tokio::time::interval(monitor.config.ping_interval);
1385			loop {
1386				interval.tick().await;
1387
1388				if monitor.connection.is_closed().await {
1389					break;
1390				}
1391
1392				// Best-effort ping; if send fails, check_heartbeat will catch it
1393				let _ = monitor.send_ping().await;
1394
1395				// Wait for pong or timeout, whichever comes first.
1396				// If pong arrives early, we skip the remaining sleep and
1397				// proceed to the next ping interval immediately.
1398				tokio::select! {
1399					() = tokio::time::sleep(monitor.config.pong_timeout) => {
1400						// Timeout elapsed without pong notification
1401						if monitor.check_heartbeat().await {
1402							break;
1403						}
1404					}
1405					() = monitor.pong_notify.notified() => {
1406						// Pong received early; no need to wait further
1407					}
1408				}
1409			}
1410		})
1411	}
1412}
1413
1414#[cfg(test)]
1415mod tests {
1416	use super::*;
1417	use rstest::rstest;
1418
1419	#[rstest]
1420	fn test_message_text() {
1421		// Arrange
1422		let text = "Hello".to_string();
1423
1424		// Act
1425		let msg = Message::text(text);
1426
1427		// Assert
1428		match msg {
1429			Message::Text { data } => assert_eq!(data, "Hello"),
1430			_ => panic!("Expected text message"),
1431		}
1432	}
1433
1434	#[rstest]
1435	fn test_message_json() {
1436		// Arrange
1437		#[derive(serde::Serialize)]
1438		struct TestData {
1439			value: i32,
1440		}
1441		let data = TestData { value: 42 };
1442
1443		// Act
1444		let msg = Message::json(&data).unwrap();
1445
1446		// Assert
1447		match msg {
1448			Message::Text { data } => assert!(data.contains("42")),
1449			_ => panic!("Expected text message"),
1450		}
1451	}
1452
1453	#[rstest]
1454	#[tokio::test]
1455	async fn test_connection_send() {
1456		// Arrange
1457		let (tx, mut rx) = mpsc::unbounded_channel();
1458		let conn = WebSocketConnection::new("test".to_string(), tx);
1459
1460		// Act
1461		conn.send_text("Hello".to_string()).await.unwrap();
1462
1463		// Assert
1464		let received = rx.recv().await.unwrap();
1465		match received {
1466			Message::Text { data } => assert_eq!(data, "Hello"),
1467			_ => panic!("Expected text message"),
1468		}
1469	}
1470
1471	#[rstest]
1472	fn test_connection_config_default() {
1473		// Arrange & Act
1474		let config = ConnectionConfig::new();
1475
1476		// Assert
1477		assert_eq!(config.idle_timeout(), Duration::from_secs(300));
1478		assert_eq!(config.handshake_timeout(), Duration::from_secs(10));
1479		assert_eq!(config.cleanup_interval(), Duration::from_secs(30));
1480	}
1481
1482	#[rstest]
1483	fn test_connection_config_strict() {
1484		// Arrange & Act
1485		let config = ConnectionConfig::strict();
1486
1487		// Assert
1488		assert_eq!(config.idle_timeout(), Duration::from_secs(30));
1489		assert_eq!(config.handshake_timeout(), Duration::from_secs(5));
1490		assert_eq!(config.cleanup_interval(), Duration::from_secs(10));
1491	}
1492
1493	#[rstest]
1494	fn test_connection_config_permissive() {
1495		// Arrange & Act
1496		let config = ConnectionConfig::permissive();
1497
1498		// Assert
1499		assert_eq!(config.idle_timeout(), Duration::from_secs(3600));
1500		assert_eq!(config.handshake_timeout(), Duration::from_secs(30));
1501		assert_eq!(config.cleanup_interval(), Duration::from_secs(60));
1502	}
1503
1504	#[rstest]
1505	fn test_connection_config_no_timeout() {
1506		// Arrange & Act
1507		let config = ConnectionConfig::no_timeout();
1508
1509		// Assert
1510		assert_eq!(config.idle_timeout(), Duration::MAX);
1511		assert_eq!(config.handshake_timeout(), Duration::MAX);
1512	}
1513
1514	#[rstest]
1515	fn test_connection_config_builder() {
1516		// Arrange & Act
1517		let config = ConnectionConfig::new()
1518			.with_idle_timeout(Duration::from_secs(120))
1519			.with_handshake_timeout(Duration::from_secs(15))
1520			.with_cleanup_interval(Duration::from_secs(20));
1521
1522		// Assert
1523		assert_eq!(config.idle_timeout(), Duration::from_secs(120));
1524		assert_eq!(config.handshake_timeout(), Duration::from_secs(15));
1525		assert_eq!(config.cleanup_interval(), Duration::from_secs(20));
1526	}
1527
1528	#[rstest]
1529	#[tokio::test]
1530	async fn test_connection_with_config() {
1531		// Arrange
1532		let config = ConnectionConfig::new().with_idle_timeout(Duration::from_secs(60));
1533		let (tx, _rx) = mpsc::unbounded_channel();
1534
1535		// Act
1536		let conn = WebSocketConnection::with_config("test".to_string(), tx, config);
1537
1538		// Assert
1539		assert_eq!(conn.config().idle_timeout(), Duration::from_secs(60));
1540		assert!(!conn.is_idle().await);
1541	}
1542
1543	#[rstest]
1544	#[tokio::test]
1545	async fn test_connection_record_activity_resets_idle() {
1546		// Arrange
1547		let config = ConnectionConfig::new().with_idle_timeout(Duration::from_millis(50));
1548		let (tx, _rx) = mpsc::unbounded_channel();
1549		let conn = WebSocketConnection::with_config("test".to_string(), tx, config);
1550
1551		// Act - wait for connection to become idle
1552		tokio::time::sleep(Duration::from_millis(60)).await;
1553		assert!(conn.is_idle().await);
1554
1555		// Act - record activity to reset idle timer
1556		conn.record_activity().await;
1557
1558		// Assert - connection should no longer be idle
1559		assert!(!conn.is_idle().await);
1560	}
1561
1562	#[rstest]
1563	#[tokio::test]
1564	async fn test_connection_becomes_idle_after_timeout() {
1565		// Arrange
1566		let config = ConnectionConfig::new().with_idle_timeout(Duration::from_millis(50));
1567		let (tx, _rx) = mpsc::unbounded_channel();
1568		let conn = WebSocketConnection::with_config("test".to_string(), tx, config);
1569
1570		// Act - wait for connection to exceed idle timeout
1571		tokio::time::sleep(Duration::from_millis(60)).await;
1572
1573		// Assert
1574		assert!(conn.is_idle().await);
1575		assert!(conn.idle_duration().await >= Duration::from_millis(50));
1576	}
1577
1578	#[rstest]
1579	#[tokio::test]
1580	async fn test_send_resets_activity() {
1581		// Arrange
1582		let config = ConnectionConfig::new().with_idle_timeout(Duration::from_millis(100));
1583		let (tx, mut _rx) = mpsc::unbounded_channel();
1584		let conn = WebSocketConnection::with_config("test".to_string(), tx, config);
1585
1586		// Act - wait a bit then send
1587		tokio::time::sleep(Duration::from_millis(50)).await;
1588		conn.send_text("ping".to_string()).await.unwrap();
1589
1590		// Assert - activity should be recent
1591		assert!(conn.idle_duration().await < Duration::from_millis(30));
1592		assert!(!conn.is_idle().await);
1593	}
1594
1595	#[rstest]
1596	#[tokio::test]
1597	async fn test_close_with_reason() {
1598		// Arrange
1599		let (tx, mut rx) = mpsc::unbounded_channel();
1600		let conn = WebSocketConnection::new("test".to_string(), tx);
1601
1602		// Act
1603		conn.close_with_reason(1001, "Idle timeout".to_string())
1604			.await
1605			.unwrap();
1606
1607		// Assert
1608		assert!(conn.is_closed().await);
1609		let msg = rx.recv().await.unwrap();
1610		match msg {
1611			Message::Close { code, reason } => {
1612				assert_eq!(code, 1001);
1613				assert_eq!(reason, "Idle timeout");
1614			}
1615			_ => panic!("Expected close message"),
1616		}
1617	}
1618
1619	#[rstest]
1620	#[tokio::test]
1621	async fn test_timeout_monitor_register_and_count() {
1622		// Arrange
1623		let config = ConnectionConfig::new();
1624		let monitor = ConnectionTimeoutMonitor::new(config);
1625		let (tx, _rx) = mpsc::unbounded_channel();
1626		let conn = Arc::new(WebSocketConnection::new("conn_1".to_string(), tx));
1627
1628		// Act
1629		monitor.register(conn).await.unwrap();
1630
1631		// Assert
1632		assert_eq!(monitor.connection_count().await, 1);
1633	}
1634
1635	#[rstest]
1636	#[tokio::test]
1637	async fn test_timeout_monitor_unregister() {
1638		// Arrange
1639		let config = ConnectionConfig::new();
1640		let monitor = ConnectionTimeoutMonitor::new(config);
1641		let (tx, _rx) = mpsc::unbounded_channel();
1642		let conn = Arc::new(WebSocketConnection::new("conn_1".to_string(), tx));
1643		monitor.register(conn).await.unwrap();
1644
1645		// Act
1646		monitor.unregister("conn_1").await;
1647
1648		// Assert
1649		assert_eq!(monitor.connection_count().await, 0);
1650	}
1651
1652	#[rstest]
1653	#[tokio::test]
1654	async fn test_timeout_monitor_closes_idle_connections() {
1655		// Arrange
1656		let config = ConnectionConfig::new().with_idle_timeout(Duration::from_millis(50));
1657		let monitor = ConnectionTimeoutMonitor::new(config);
1658
1659		let (tx1, mut rx1) = mpsc::unbounded_channel();
1660		let conn1 = Arc::new(WebSocketConnection::with_config(
1661			"idle_conn".to_string(),
1662			tx1,
1663			ConnectionConfig::new().with_idle_timeout(Duration::from_millis(50)),
1664		));
1665
1666		let (tx2, _rx2) = mpsc::unbounded_channel();
1667		let conn2 = Arc::new(WebSocketConnection::with_config(
1668			"active_conn".to_string(),
1669			tx2,
1670			ConnectionConfig::new().with_idle_timeout(Duration::from_secs(300)),
1671		));
1672
1673		monitor.register(conn1).await.unwrap();
1674		monitor.register(conn2.clone()).await.unwrap();
1675
1676		// Act - wait for idle timeout to expire
1677		tokio::time::sleep(Duration::from_millis(60)).await;
1678		// Keep active connection alive
1679		conn2.record_activity().await;
1680
1681		let timed_out = monitor.check_idle_connections().await;
1682
1683		// Assert
1684		assert_eq!(timed_out.len(), 1);
1685		assert_eq!(timed_out[0], "idle_conn");
1686		assert_eq!(monitor.connection_count().await, 1);
1687
1688		// Verify the idle connection received a close message
1689		let msg = rx1.recv().await.unwrap();
1690		match msg {
1691			Message::Close { code, reason } => {
1692				assert_eq!(code, 1001);
1693				assert!(reason.contains("Idle timeout"));
1694			}
1695			_ => panic!("Expected close message for idle connection"),
1696		}
1697	}
1698
1699	#[rstest]
1700	#[tokio::test]
1701	async fn test_timeout_monitor_removes_already_closed_connections() {
1702		// Arrange
1703		let config = ConnectionConfig::new();
1704		let monitor = ConnectionTimeoutMonitor::new(config);
1705		let (tx, _rx) = mpsc::unbounded_channel();
1706		let conn = Arc::new(WebSocketConnection::new("conn_1".to_string(), tx));
1707		conn.close().await.unwrap();
1708		monitor.register(conn).await.unwrap();
1709
1710		// Act
1711		let timed_out = monitor.check_idle_connections().await;
1712
1713		// Assert
1714		assert_eq!(timed_out.len(), 1);
1715		assert_eq!(timed_out[0], "conn_1");
1716		assert_eq!(monitor.connection_count().await, 0);
1717	}
1718
1719	#[rstest]
1720	#[tokio::test]
1721	async fn test_timeout_monitor_background_task() {
1722		// Arrange
1723		let config = ConnectionConfig::new()
1724			.with_idle_timeout(Duration::from_millis(30))
1725			.with_cleanup_interval(Duration::from_millis(20));
1726		let monitor = Arc::new(ConnectionTimeoutMonitor::new(config));
1727
1728		let (tx, mut rx) = mpsc::unbounded_channel();
1729		let conn = Arc::new(WebSocketConnection::with_config(
1730			"bg_conn".to_string(),
1731			tx,
1732			ConnectionConfig::new().with_idle_timeout(Duration::from_millis(30)),
1733		));
1734		monitor.register(conn).await.unwrap();
1735
1736		// Act - start background monitor
1737		let handle = monitor.start();
1738
1739		// Wait for the monitor to detect and close the idle connection
1740		tokio::time::sleep(Duration::from_millis(120)).await;
1741
1742		// Assert
1743		assert_eq!(monitor.connection_count().await, 0);
1744
1745		// Verify close message was sent
1746		let msg = rx.recv().await.unwrap();
1747		assert!(matches!(msg, Message::Close { .. }));
1748
1749		// Cleanup
1750		handle.abort();
1751	}
1752
1753	#[rstest]
1754	fn test_ping_pong_config_default() {
1755		// Arrange & Act
1756		let config = PingPongConfig::default();
1757
1758		// Assert
1759		assert_eq!(config.ping_interval(), Duration::from_secs(30));
1760		assert_eq!(config.pong_timeout(), Duration::from_secs(10));
1761	}
1762
1763	#[rstest]
1764	fn test_ping_pong_config_custom() {
1765		// Arrange & Act
1766		let config = PingPongConfig::new(Duration::from_secs(15), Duration::from_secs(5));
1767
1768		// Assert
1769		assert_eq!(config.ping_interval(), Duration::from_secs(15));
1770		assert_eq!(config.pong_timeout(), Duration::from_secs(5));
1771	}
1772
1773	#[rstest]
1774	fn test_ping_pong_config_builder() {
1775		// Arrange & Act
1776		let config = PingPongConfig::default()
1777			.with_ping_interval(Duration::from_secs(60))
1778			.with_pong_timeout(Duration::from_secs(20));
1779
1780		// Assert
1781		assert_eq!(config.ping_interval(), Duration::from_secs(60));
1782		assert_eq!(config.pong_timeout(), Duration::from_secs(20));
1783	}
1784
1785	#[rstest]
1786	fn test_connection_config_has_default_ping_config() {
1787		// Arrange & Act
1788		let config = ConnectionConfig::new();
1789
1790		// Assert
1791		assert_eq!(
1792			config.ping_config().ping_interval(),
1793			Duration::from_secs(30)
1794		);
1795		assert_eq!(config.ping_config().pong_timeout(), Duration::from_secs(10));
1796	}
1797
1798	#[rstest]
1799	fn test_connection_config_with_custom_ping_config() {
1800		// Arrange
1801		let ping_config = PingPongConfig::new(Duration::from_secs(15), Duration::from_secs(5));
1802
1803		// Act
1804		let config = ConnectionConfig::new().with_ping_config(ping_config);
1805
1806		// Assert
1807		assert_eq!(
1808			config.ping_config().ping_interval(),
1809			Duration::from_secs(15)
1810		);
1811		assert_eq!(config.ping_config().pong_timeout(), Duration::from_secs(5));
1812	}
1813
1814	#[rstest]
1815	fn test_strict_config_has_aggressive_ping() {
1816		// Arrange & Act
1817		let config = ConnectionConfig::strict();
1818
1819		// Assert
1820		assert_eq!(
1821			config.ping_config().ping_interval(),
1822			Duration::from_secs(10)
1823		);
1824		assert_eq!(config.ping_config().pong_timeout(), Duration::from_secs(5));
1825	}
1826
1827	#[rstest]
1828	fn test_permissive_config_has_relaxed_ping() {
1829		// Arrange & Act
1830		let config = ConnectionConfig::permissive();
1831
1832		// Assert
1833		assert_eq!(
1834			config.ping_config().ping_interval(),
1835			Duration::from_secs(60)
1836		);
1837		assert_eq!(config.ping_config().pong_timeout(), Duration::from_secs(30));
1838	}
1839
1840	#[rstest]
1841	#[tokio::test]
1842	async fn test_timeout_monitor_rejects_when_max_connections_reached() {
1843		// Arrange
1844		let config = ConnectionConfig::new().with_max_connections(Some(1));
1845		let monitor = ConnectionTimeoutMonitor::new(config);
1846
1847		let (tx1, _rx1) = mpsc::unbounded_channel();
1848		let conn1 = Arc::new(WebSocketConnection::new("conn_1".to_string(), tx1));
1849
1850		let (tx2, _rx2) = mpsc::unbounded_channel();
1851		let conn2 = Arc::new(WebSocketConnection::new("conn_2".to_string(), tx2));
1852
1853		// Act
1854		monitor.register(conn1).await.unwrap();
1855		let result = monitor.register(conn2).await;
1856
1857		// Assert
1858		assert!(result.is_err());
1859		assert_eq!(monitor.connection_count().await, 1);
1860	}
1861
1862	#[rstest]
1863	#[tokio::test]
1864	async fn test_force_close_marks_connection_closed() {
1865		// Arrange
1866		let (tx, _rx) = mpsc::unbounded_channel();
1867		let conn = WebSocketConnection::new("test".to_string(), tx);
1868
1869		// Act
1870		conn.force_close().await;
1871
1872		// Assert
1873		assert!(conn.is_closed().await);
1874	}
1875
1876	#[rstest]
1877	#[tokio::test]
1878	async fn test_close_marks_closed_even_when_channel_dropped() {
1879		// Arrange
1880		let (tx, rx) = mpsc::unbounded_channel();
1881		let conn = WebSocketConnection::new("test".to_string(), tx);
1882
1883		// Drop receiver to simulate broken channel
1884		drop(rx);
1885
1886		// Act - close should still mark the connection as closed
1887		let result = conn.close().await;
1888
1889		// Assert
1890		assert!(result.is_err()); // send fails because receiver is dropped
1891		assert!(conn.is_closed().await); // but connection is still marked closed
1892	}
1893
1894	#[rstest]
1895	#[tokio::test]
1896	async fn test_close_with_reason_marks_closed_even_when_channel_dropped() {
1897		// Arrange
1898		let (tx, rx) = mpsc::unbounded_channel();
1899		let conn = WebSocketConnection::new("test".to_string(), tx);
1900
1901		// Drop receiver to simulate broken channel
1902		drop(rx);
1903
1904		// Act
1905		let result = conn
1906			.close_with_reason(1006, "Abnormal close".to_string())
1907			.await;
1908
1909		// Assert
1910		assert!(result.is_err());
1911		assert!(conn.is_closed().await);
1912	}
1913
1914	#[rstest]
1915	#[tokio::test]
1916	async fn test_send_after_force_close_returns_error() {
1917		// Arrange
1918		let (tx, _rx) = mpsc::unbounded_channel();
1919		let conn = WebSocketConnection::new("test".to_string(), tx);
1920		conn.force_close().await;
1921
1922		// Act
1923		let result = conn.send_text("should fail".to_string()).await;
1924
1925		// Assert
1926		assert!(result.is_err());
1927		assert!(matches!(result.unwrap_err(), WebSocketError::Send(_)));
1928	}
1929
1930	#[rstest]
1931	fn test_heartbeat_config_default() {
1932		// Arrange & Act
1933		let config = HeartbeatConfig::default();
1934
1935		// Assert
1936		assert_eq!(config.ping_interval(), Duration::from_secs(30));
1937		assert_eq!(config.pong_timeout(), Duration::from_secs(10));
1938	}
1939
1940	#[rstest]
1941	fn test_heartbeat_config_custom() {
1942		// Arrange & Act
1943		let config = HeartbeatConfig::new(Duration::from_secs(15), Duration::from_secs(5));
1944
1945		// Assert
1946		assert_eq!(config.ping_interval(), Duration::from_secs(15));
1947		assert_eq!(config.pong_timeout(), Duration::from_secs(5));
1948	}
1949
1950	#[rstest]
1951	#[tokio::test]
1952	async fn test_heartbeat_monitor_initial_state() {
1953		// Arrange
1954		let (tx, _rx) = mpsc::unbounded_channel();
1955		let conn = Arc::new(WebSocketConnection::new("hb_test".to_string(), tx));
1956		let config = HeartbeatConfig::default();
1957
1958		// Act
1959		let monitor = HeartbeatMonitor::new(conn, config);
1960
1961		// Assert
1962		assert!(!monitor.is_timed_out().await);
1963		assert!(monitor.time_since_last_pong().await < Duration::from_secs(1));
1964	}
1965
1966	#[rstest]
1967	#[tokio::test]
1968	async fn test_heartbeat_monitor_record_pong_resets_timer() {
1969		// Arrange
1970		let (tx, _rx) = mpsc::unbounded_channel();
1971		let conn = Arc::new(WebSocketConnection::new("hb_pong".to_string(), tx));
1972		let config = HeartbeatConfig::new(Duration::from_millis(50), Duration::from_millis(30));
1973		let monitor = HeartbeatMonitor::new(conn, config);
1974
1975		// Act - wait then record pong
1976		tokio::time::sleep(Duration::from_millis(20)).await;
1977		monitor.record_pong().await;
1978
1979		// Assert
1980		assert!(monitor.time_since_last_pong().await < Duration::from_millis(10));
1981	}
1982
1983	#[rstest]
1984	#[tokio::test]
1985	async fn test_heartbeat_monitor_timeout_closes_connection() {
1986		// Arrange
1987		let (tx, _rx) = mpsc::unbounded_channel();
1988		let conn = Arc::new(WebSocketConnection::new("hb_timeout".to_string(), tx));
1989		let config = HeartbeatConfig::new(Duration::from_millis(50), Duration::from_millis(30));
1990		let monitor = HeartbeatMonitor::new(conn.clone(), config);
1991
1992		// Act - wait past the pong timeout
1993		tokio::time::sleep(Duration::from_millis(40)).await;
1994		let timed_out = monitor.check_heartbeat().await;
1995
1996		// Assert
1997		assert!(timed_out);
1998		assert!(monitor.is_timed_out().await);
1999		assert!(conn.is_closed().await);
2000	}
2001
2002	#[rstest]
2003	#[tokio::test]
2004	async fn test_heartbeat_monitor_no_timeout_when_pong_received() {
2005		// Arrange
2006		let (tx, _rx) = mpsc::unbounded_channel();
2007		let conn = Arc::new(WebSocketConnection::new("hb_ok".to_string(), tx));
2008		let config = HeartbeatConfig::new(Duration::from_millis(100), Duration::from_millis(50));
2009		let monitor = HeartbeatMonitor::new(conn.clone(), config);
2010
2011		// Act - record pong within timeout window
2012		tokio::time::sleep(Duration::from_millis(20)).await;
2013		monitor.record_pong().await;
2014		let timed_out = monitor.check_heartbeat().await;
2015
2016		// Assert
2017		assert!(!timed_out);
2018		assert!(!monitor.is_timed_out().await);
2019		assert!(!conn.is_closed().await);
2020	}
2021
2022	#[rstest]
2023	#[tokio::test]
2024	async fn test_heartbeat_monitor_send_ping() {
2025		// Arrange
2026		let (tx, mut rx) = mpsc::unbounded_channel();
2027		let conn = Arc::new(WebSocketConnection::new("hb_ping".to_string(), tx));
2028		let config = HeartbeatConfig::default();
2029		let monitor = HeartbeatMonitor::new(conn, config);
2030
2031		// Act
2032		monitor.send_ping().await.unwrap();
2033
2034		// Assert
2035		let msg = rx.recv().await.unwrap();
2036		assert!(matches!(msg, Message::Ping));
2037	}
2038
2039	#[rstest]
2040	#[tokio::test]
2041	async fn test_heartbeat_monitor_early_pong_skips_full_sleep() {
2042		// Arrange
2043		let (tx, _rx) = mpsc::unbounded_channel();
2044		let conn = Arc::new(WebSocketConnection::new("hb_early".to_string(), tx));
2045		// Use a long pong_timeout to make the test obvious
2046		let config = HeartbeatConfig {
2047			ping_interval: Duration::from_secs(60),
2048			pong_timeout: Duration::from_secs(10),
2049		};
2050		let monitor = Arc::new(HeartbeatMonitor::new(conn, config));
2051
2052		// Act: simulate pong arriving after a short delay
2053		let monitor_clone = Arc::clone(&monitor);
2054		tokio::spawn(async move {
2055			tokio::time::sleep(Duration::from_millis(50)).await;
2056			monitor_clone.record_pong().await;
2057		});
2058
2059		// Send ping and wait for pong or timeout via select!
2060		let _ = monitor.send_ping().await;
2061		let start = Instant::now();
2062
2063		tokio::select! {
2064			() = tokio::time::sleep(monitor.config.pong_timeout) => {
2065				panic!("Should not reach full timeout");
2066			}
2067			() = monitor.pong_notify.notified() => {
2068				// Pong received early
2069			}
2070		}
2071
2072		// Assert: should complete well before the 10s pong_timeout
2073		let elapsed = start.elapsed();
2074		assert!(
2075			elapsed < Duration::from_secs(2),
2076			"Expected early wakeup but elapsed {:?}",
2077			elapsed
2078		);
2079	}
2080
2081	#[rstest]
2082	fn test_websocket_error_binary_payload_variant() {
2083		// Arrange & Act
2084		let err = WebSocketError::BinaryPayload("invalid data".to_string());
2085
2086		// Assert
2087		assert_eq!(err.to_string(), "Invalid binary payload: invalid data");
2088	}
2089
2090	#[rstest]
2091	fn test_websocket_error_heartbeat_timeout_variant() {
2092		// Arrange & Act
2093		let err = WebSocketError::HeartbeatTimeout(Duration::from_secs(10));
2094
2095		// Assert
2096		assert_eq!(
2097			err.to_string(),
2098			"Heartbeat timeout: no pong received within 10s"
2099		);
2100	}
2101
2102	#[rstest]
2103	fn test_websocket_error_slow_consumer_variant() {
2104		// Arrange & Act
2105		let err = WebSocketError::SlowConsumer(Duration::from_secs(5));
2106
2107		// Assert
2108		assert_eq!(err.to_string(), "Slow consumer: send timed out after 5s");
2109	}
2110}