Skip to main content

reinhardt_websockets/
connection.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4use tokio::sync::RwLock;
5use tokio::sync::mpsc;
6
7/// Ping/pong keepalive configuration for WebSocket connections.
8///
9/// Controls how frequently ping frames are sent and how long
10/// the server waits for a pong response before considering
11/// the connection dead.
12///
13/// # Examples
14///
15/// ```
16/// use reinhardt_websockets::connection::PingPongConfig;
17/// use std::time::Duration;
18///
19/// // Use defaults (30s ping interval, 10s pong timeout)
20/// let config = PingPongConfig::default();
21/// assert_eq!(config.ping_interval(), Duration::from_secs(30));
22/// assert_eq!(config.pong_timeout(), Duration::from_secs(10));
23///
24/// // Custom configuration
25/// let config = PingPongConfig::new(
26///     Duration::from_secs(15),
27///     Duration::from_secs(5),
28/// );
29/// assert_eq!(config.ping_interval(), Duration::from_secs(15));
30/// assert_eq!(config.pong_timeout(), Duration::from_secs(5));
31/// ```
32#[derive(Debug, Clone)]
33pub struct PingPongConfig {
34	/// Interval between ping frames sent to the client
35	ping_interval: Duration,
36	/// Maximum time to wait for a pong response before closing
37	pong_timeout: Duration,
38}
39
40impl Default for PingPongConfig {
41	fn default() -> Self {
42		Self {
43			ping_interval: Duration::from_secs(30),
44			pong_timeout: Duration::from_secs(10),
45		}
46	}
47}
48
49impl PingPongConfig {
50	/// Creates a new ping/pong configuration with the given intervals.
51	///
52	/// # Arguments
53	///
54	/// * `ping_interval` - How often to send ping frames
55	/// * `pong_timeout` - How long to wait for a pong response
56	///
57	/// # Examples
58	///
59	/// ```
60	/// use reinhardt_websockets::connection::PingPongConfig;
61	/// use std::time::Duration;
62	///
63	/// let config = PingPongConfig::new(
64	///     Duration::from_secs(20),
65	///     Duration::from_secs(8),
66	/// );
67	/// assert_eq!(config.ping_interval(), Duration::from_secs(20));
68	/// assert_eq!(config.pong_timeout(), Duration::from_secs(8));
69	/// ```
70	pub fn new(ping_interval: Duration, pong_timeout: Duration) -> Self {
71		Self {
72			ping_interval,
73			pong_timeout,
74		}
75	}
76
77	/// Returns the ping interval duration.
78	pub fn ping_interval(&self) -> Duration {
79		self.ping_interval
80	}
81
82	/// Returns the pong timeout duration.
83	pub fn pong_timeout(&self) -> Duration {
84		self.pong_timeout
85	}
86
87	/// Sets the ping interval duration.
88	///
89	/// # Examples
90	///
91	/// ```
92	/// use reinhardt_websockets::connection::PingPongConfig;
93	/// use std::time::Duration;
94	///
95	/// let config = PingPongConfig::default()
96	///     .with_ping_interval(Duration::from_secs(60));
97	/// assert_eq!(config.ping_interval(), Duration::from_secs(60));
98	/// ```
99	pub fn with_ping_interval(mut self, interval: Duration) -> Self {
100		self.ping_interval = interval;
101		self
102	}
103
104	/// Sets the pong timeout duration.
105	///
106	/// # Examples
107	///
108	/// ```
109	/// use reinhardt_websockets::connection::PingPongConfig;
110	/// use std::time::Duration;
111	///
112	/// let config = PingPongConfig::default()
113	///     .with_pong_timeout(Duration::from_secs(15));
114	/// assert_eq!(config.pong_timeout(), Duration::from_secs(15));
115	/// ```
116	pub fn with_pong_timeout(mut self, timeout: Duration) -> Self {
117		self.pong_timeout = timeout;
118		self
119	}
120}
121
122/// Connection timeout configuration
123///
124/// This struct defines the timeout settings for WebSocket connections
125/// to prevent resource exhaustion from idle connections.
126///
127/// # Fields
128///
129/// - `idle_timeout` - Maximum duration a connection can be idle before being closed (default: 5 minutes)
130/// - `handshake_timeout` - Maximum duration for the WebSocket handshake to complete (default: 10 seconds)
131/// - `cleanup_interval` - Interval for checking idle connections (default: 30 seconds)
132///
133/// # Examples
134///
135/// ```
136/// use reinhardt_websockets::connection::ConnectionConfig;
137/// use std::time::Duration;
138///
139/// let config = ConnectionConfig::new()
140///     .with_idle_timeout(Duration::from_secs(300))
141///     .with_handshake_timeout(Duration::from_secs(10))
142///     .with_cleanup_interval(Duration::from_secs(30));
143///
144/// assert_eq!(config.idle_timeout(), Duration::from_secs(300));
145/// assert_eq!(config.handshake_timeout(), Duration::from_secs(10));
146/// assert_eq!(config.cleanup_interval(), Duration::from_secs(30));
147/// ```
148#[derive(Debug, Clone)]
149pub struct ConnectionConfig {
150	idle_timeout: Duration,
151	handshake_timeout: Duration,
152	cleanup_interval: Duration,
153	/// Maximum number of concurrent connections allowed (None for unlimited)
154	max_connections: Option<usize>,
155	/// Ping/pong keepalive configuration
156	ping_config: PingPongConfig,
157}
158
159impl Default for ConnectionConfig {
160	fn default() -> Self {
161		Self {
162			idle_timeout: Duration::from_secs(300), // 5 minutes default
163			handshake_timeout: Duration::from_secs(10), // 10 seconds default
164			cleanup_interval: Duration::from_secs(30), // 30 seconds default
165			max_connections: None,                  // Unlimited by default
166			ping_config: PingPongConfig::default(),
167		}
168	}
169}
170
171impl ConnectionConfig {
172	/// Create a new connection configuration with default values
173	///
174	/// # Examples
175	///
176	/// ```
177	/// use reinhardt_websockets::connection::ConnectionConfig;
178	/// use std::time::Duration;
179	///
180	/// let config = ConnectionConfig::new();
181	/// assert_eq!(config.idle_timeout(), Duration::from_secs(300));
182	/// assert_eq!(config.handshake_timeout(), Duration::from_secs(10));
183	/// assert_eq!(config.cleanup_interval(), Duration::from_secs(30));
184	/// ```
185	pub fn new() -> Self {
186		Self::default()
187	}
188
189	/// Set the idle timeout duration
190	///
191	/// # Arguments
192	///
193	/// * `timeout` - Maximum duration a connection can be idle before being closed
194	///
195	/// # Examples
196	///
197	/// ```
198	/// use reinhardt_websockets::connection::ConnectionConfig;
199	/// use std::time::Duration;
200	///
201	/// let config = ConnectionConfig::new()
202	///     .with_idle_timeout(Duration::from_secs(60));
203	///
204	/// assert_eq!(config.idle_timeout(), Duration::from_secs(60));
205	/// ```
206	pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
207		self.idle_timeout = timeout;
208		self
209	}
210
211	/// Set the handshake timeout duration
212	///
213	/// # Arguments
214	///
215	/// * `timeout` - Maximum duration for the WebSocket handshake to complete
216	///
217	/// # Examples
218	///
219	/// ```
220	/// use reinhardt_websockets::connection::ConnectionConfig;
221	/// use std::time::Duration;
222	///
223	/// let config = ConnectionConfig::new()
224	///     .with_handshake_timeout(Duration::from_secs(5));
225	///
226	/// assert_eq!(config.handshake_timeout(), Duration::from_secs(5));
227	/// ```
228	pub fn with_handshake_timeout(mut self, timeout: Duration) -> Self {
229		self.handshake_timeout = timeout;
230		self
231	}
232
233	/// Set the cleanup interval for checking idle connections
234	///
235	/// # Arguments
236	///
237	/// * `interval` - How often to check for idle connections
238	///
239	/// # Examples
240	///
241	/// ```
242	/// use reinhardt_websockets::connection::ConnectionConfig;
243	/// use std::time::Duration;
244	///
245	/// let config = ConnectionConfig::new()
246	///     .with_cleanup_interval(Duration::from_secs(15));
247	///
248	/// assert_eq!(config.cleanup_interval(), Duration::from_secs(15));
249	/// ```
250	pub fn with_cleanup_interval(mut self, interval: Duration) -> Self {
251		self.cleanup_interval = interval;
252		self
253	}
254
255	/// Get the idle timeout duration
256	pub fn idle_timeout(&self) -> Duration {
257		self.idle_timeout
258	}
259
260	/// Get the handshake timeout duration
261	pub fn handshake_timeout(&self) -> Duration {
262		self.handshake_timeout
263	}
264
265	/// Get the cleanup interval duration
266	pub fn cleanup_interval(&self) -> Duration {
267		self.cleanup_interval
268	}
269
270	/// Set the maximum number of concurrent connections
271	///
272	/// # Arguments
273	///
274	/// * `max` - Maximum number of connections allowed. Use `None` for unlimited.
275	///
276	/// # Examples
277	///
278	/// ```
279	/// use reinhardt_websockets::connection::ConnectionConfig;
280	///
281	/// let config = ConnectionConfig::new()
282	///     .with_max_connections(Some(1000));
283	///
284	/// assert_eq!(config.max_connections(), Some(1000));
285	/// ```
286	pub fn with_max_connections(mut self, max: Option<usize>) -> Self {
287		self.max_connections = max;
288		self
289	}
290
291	/// Get the maximum number of concurrent connections
292	pub fn max_connections(&self) -> Option<usize> {
293		self.max_connections
294	}
295
296	/// Set the ping/pong keepalive configuration.
297	///
298	/// # Arguments
299	///
300	/// * `config` - The ping/pong configuration to use
301	///
302	/// # Examples
303	///
304	/// ```
305	/// use reinhardt_websockets::connection::{ConnectionConfig, PingPongConfig};
306	/// use std::time::Duration;
307	///
308	/// let ping_config = PingPongConfig::new(
309	///     Duration::from_secs(15),
310	///     Duration::from_secs(5),
311	/// );
312	/// let config = ConnectionConfig::new()
313	///     .with_ping_config(ping_config);
314	///
315	/// assert_eq!(config.ping_config().ping_interval(), Duration::from_secs(15));
316	/// assert_eq!(config.ping_config().pong_timeout(), Duration::from_secs(5));
317	/// ```
318	pub fn with_ping_config(mut self, config: PingPongConfig) -> Self {
319		self.ping_config = config;
320		self
321	}
322
323	/// Get the ping/pong keepalive configuration.
324	pub fn ping_config(&self) -> &PingPongConfig {
325		&self.ping_config
326	}
327
328	/// Create a configuration with no idle timeout (connections never time out)
329	///
330	/// # Examples
331	///
332	/// ```
333	/// use reinhardt_websockets::connection::ConnectionConfig;
334	///
335	/// let config = ConnectionConfig::no_timeout();
336	/// assert_eq!(config.idle_timeout(), std::time::Duration::MAX);
337	/// assert_eq!(config.handshake_timeout(), std::time::Duration::MAX);
338	/// ```
339	pub fn no_timeout() -> Self {
340		Self {
341			idle_timeout: Duration::MAX,
342			handshake_timeout: Duration::MAX,
343			cleanup_interval: Duration::from_secs(30),
344			max_connections: None,
345			ping_config: PingPongConfig::default(),
346		}
347	}
348
349	/// Create a strict configuration with short timeouts
350	///
351	/// - Idle timeout: 30 seconds
352	/// - Handshake timeout: 5 seconds
353	/// - Cleanup interval: 10 seconds
354	///
355	/// # Examples
356	///
357	/// ```
358	/// use reinhardt_websockets::connection::ConnectionConfig;
359	/// use std::time::Duration;
360	///
361	/// let config = ConnectionConfig::strict();
362	/// assert_eq!(config.idle_timeout(), Duration::from_secs(30));
363	/// assert_eq!(config.handshake_timeout(), Duration::from_secs(5));
364	/// assert_eq!(config.cleanup_interval(), Duration::from_secs(10));
365	/// ```
366	pub fn strict() -> Self {
367		Self {
368			idle_timeout: Duration::from_secs(30),
369			handshake_timeout: Duration::from_secs(5),
370			cleanup_interval: Duration::from_secs(10),
371			max_connections: None,
372			ping_config: PingPongConfig::new(Duration::from_secs(10), Duration::from_secs(5)),
373		}
374	}
375
376	/// Create a permissive configuration with long timeouts
377	///
378	/// - Idle timeout: 1 hour
379	/// - Handshake timeout: 30 seconds
380	/// - Cleanup interval: 60 seconds
381	///
382	/// # Examples
383	///
384	/// ```
385	/// use reinhardt_websockets::connection::ConnectionConfig;
386	/// use std::time::Duration;
387	///
388	/// let config = ConnectionConfig::permissive();
389	/// assert_eq!(config.idle_timeout(), Duration::from_secs(3600));
390	/// assert_eq!(config.handshake_timeout(), Duration::from_secs(30));
391	/// assert_eq!(config.cleanup_interval(), Duration::from_secs(60));
392	/// ```
393	pub fn permissive() -> Self {
394		Self {
395			idle_timeout: Duration::from_secs(3600),
396			handshake_timeout: Duration::from_secs(30),
397			cleanup_interval: Duration::from_secs(60),
398			max_connections: None,
399			ping_config: PingPongConfig::new(Duration::from_secs(60), Duration::from_secs(30)),
400		}
401	}
402}
403
404#[derive(Debug, thiserror::Error)]
405pub enum WebSocketError {
406	#[error("Connection error")]
407	Connection(String),
408	#[error("Send failed")]
409	Send(String),
410	#[error("Receive failed")]
411	Receive(String),
412	#[error("Protocol error")]
413	Protocol(String),
414	#[error("Internal error")]
415	Internal(String),
416	#[error("Connection timed out")]
417	Timeout(Duration),
418	#[error("Reconnection failed")]
419	ReconnectFailed(u32),
420	#[error("Invalid binary payload: {0}")]
421	BinaryPayload(String),
422	#[error("Heartbeat timeout: no pong received within {0:?}")]
423	HeartbeatTimeout(Duration),
424	#[error("Slow consumer: send timed out after {0:?}")]
425	SlowConsumer(Duration),
426}
427
428impl WebSocketError {
429	/// Returns a sanitized error message safe for client-facing communication.
430	///
431	/// Internal details (buffer sizes, queue depths, connection state, type names)
432	/// are stripped to prevent information leakage.
433	pub fn client_message(&self) -> &'static str {
434		match self {
435			Self::Connection(_) => "Connection error",
436			Self::Send(_) => "Failed to send message",
437			Self::Receive(_) => "Failed to receive message",
438			Self::Protocol(_) => "Protocol error",
439			Self::Internal(_) => "Internal server error",
440			Self::Timeout(_) => "Connection timed out",
441			Self::ReconnectFailed(_) => "Reconnection failed",
442			Self::BinaryPayload(_) => "Invalid message format",
443			Self::HeartbeatTimeout(_) => "Connection timed out",
444			Self::SlowConsumer(_) => "Server overloaded",
445		}
446	}
447
448	/// Returns the internal detail message for logging purposes.
449	///
450	/// This MUST NOT be sent to clients as it may contain sensitive
451	/// internal state information.
452	pub fn internal_detail(&self) -> String {
453		match self {
454			Self::Connection(msg) => format!("Connection error: {}", msg),
455			Self::Send(msg) => format!("Send error: {}", msg),
456			Self::Receive(msg) => format!("Receive error: {}", msg),
457			Self::Protocol(msg) => format!("Protocol error: {}", msg),
458			Self::Internal(msg) => format!("Internal error: {}", msg),
459			Self::Timeout(d) => format!("Connection timeout: idle for {:?}", d),
460			Self::ReconnectFailed(n) => format!("Reconnection failed after {} attempts", n),
461			Self::BinaryPayload(msg) => format!("Invalid binary payload: {}", msg),
462			Self::HeartbeatTimeout(d) => format!("Heartbeat timeout: no pong within {:?}", d),
463			Self::SlowConsumer(d) => format!("Slow consumer: send timed out after {:?}", d),
464		}
465	}
466}
467
468pub type WebSocketResult<T> = Result<T, WebSocketError>;
469
470/// WebSocket message types
471#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
472#[serde(tag = "type")]
473pub enum Message {
474	Text { data: String },
475	Binary { data: Vec<u8> },
476	Ping,
477	Pong,
478	Close { code: u16, reason: String },
479}
480
481impl Message {
482	/// Creates a new text message.
483	///
484	/// # Examples
485	///
486	/// ```
487	/// use reinhardt_websockets::Message;
488	///
489	/// let msg = Message::text("Hello, World!".to_string());
490	/// match msg {
491	///     Message::Text { data } => assert_eq!(data, "Hello, World!"),
492	///     _ => panic!("Expected text message"),
493	/// }
494	/// ```
495	pub fn text(data: String) -> Self {
496		Self::Text { data }
497	}
498	/// Creates a new binary message.
499	///
500	/// # Examples
501	///
502	/// ```
503	/// use reinhardt_websockets::Message;
504	///
505	/// let data = vec![0x48, 0x65, 0x6c, 0x6c, 0x6f]; // "Hello" in bytes
506	/// let msg = Message::binary(data.clone());
507	/// match msg {
508	///     Message::Binary { data: d } => assert_eq!(d, data),
509	///     _ => panic!("Expected binary message"),
510	/// }
511	/// ```
512	pub fn binary(data: Vec<u8>) -> Self {
513		Self::Binary { data }
514	}
515	/// Creates a text message containing JSON-serialized data.
516	///
517	/// # Examples
518	///
519	/// ```
520	/// use reinhardt_websockets::Message;
521	/// use serde::Serialize;
522	///
523	/// #[derive(Serialize)]
524	/// struct User {
525	///     name: String,
526	///     age: u32,
527	/// }
528	///
529	/// let user = User {
530	///     name: "Alice".to_string(),
531	///     age: 30,
532	/// };
533	///
534	/// let msg = Message::json(&user).unwrap();
535	/// match msg {
536	///     Message::Text { data } => {
537	///         assert!(data.contains("Alice"));
538	///         assert!(data.contains("30"));
539	///     },
540	///     _ => panic!("Expected text message"),
541	/// }
542	/// ```
543	pub fn json<T: serde::Serialize>(data: &T) -> WebSocketResult<Self> {
544		let json =
545			serde_json::to_string(data).map_err(|e| WebSocketError::Protocol(e.to_string()))?;
546		Ok(Self::text(json))
547	}
548	/// Parses the message content as JSON into the target type.
549	///
550	/// # Examples
551	///
552	/// ```
553	/// use reinhardt_websockets::Message;
554	/// use serde::Deserialize;
555	///
556	/// #[derive(Deserialize, Debug, PartialEq)]
557	/// struct User {
558	///     name: String,
559	///     age: u32,
560	/// }
561	///
562	/// let msg = Message::text(r#"{"name":"Bob","age":25}"#.to_string());
563	/// let user: User = msg.parse_json().unwrap();
564	/// assert_eq!(user.name, "Bob");
565	/// assert_eq!(user.age, 25);
566	/// ```
567	pub fn parse_json<T: serde::de::DeserializeOwned>(&self) -> WebSocketResult<T> {
568		match self {
569			Message::Text { data } => {
570				serde_json::from_str(data).map_err(|e| WebSocketError::Protocol(e.to_string()))
571			}
572			_ => Err(WebSocketError::Protocol("Not a text message".to_string())),
573		}
574	}
575}
576
577/// WebSocket connection with activity tracking and timeout support
578pub struct WebSocketConnection {
579	id: String,
580	tx: mpsc::UnboundedSender<Message>,
581	closed: Arc<RwLock<bool>>,
582	/// Subprotocol (negotiated protocol during WebSocket handshake)
583	subprotocol: Option<String>,
584	/// Timestamp of last activity on this connection
585	last_activity: Arc<RwLock<Instant>>,
586	/// Connection timeout configuration
587	config: ConnectionConfig,
588}
589
590impl WebSocketConnection {
591	/// Creates a new WebSocket connection with the given ID and sender.
592	///
593	/// Uses default [`ConnectionConfig`] for timeout settings.
594	///
595	/// # Examples
596	///
597	/// ```
598	/// use reinhardt_websockets::{WebSocketConnection, Message};
599	/// use tokio::sync::mpsc;
600	///
601	/// let (tx, _rx) = mpsc::unbounded_channel();
602	/// let conn = WebSocketConnection::new("connection_1".to_string(), tx);
603	/// assert_eq!(conn.id(), "connection_1");
604	/// ```
605	pub fn new(id: String, tx: mpsc::UnboundedSender<Message>) -> Self {
606		Self {
607			id,
608			tx,
609			closed: Arc::new(RwLock::new(false)),
610			subprotocol: None,
611			last_activity: Arc::new(RwLock::new(Instant::now())),
612			config: ConnectionConfig::default(),
613		}
614	}
615
616	/// Creates a new WebSocket connection with the given ID, sender, and configuration.
617	///
618	/// # Examples
619	///
620	/// ```
621	/// use reinhardt_websockets::{WebSocketConnection, Message};
622	/// use reinhardt_websockets::connection::ConnectionConfig;
623	/// use tokio::sync::mpsc;
624	/// use std::time::Duration;
625	///
626	/// let (tx, _rx) = mpsc::unbounded_channel();
627	/// let config = ConnectionConfig::new()
628	///     .with_idle_timeout(Duration::from_secs(60));
629	/// let conn = WebSocketConnection::with_config("conn_1".to_string(), tx, config);
630	/// assert_eq!(conn.id(), "conn_1");
631	/// assert_eq!(conn.config().idle_timeout(), Duration::from_secs(60));
632	/// ```
633	pub fn with_config(
634		id: String,
635		tx: mpsc::UnboundedSender<Message>,
636		config: ConnectionConfig,
637	) -> Self {
638		Self {
639			id,
640			tx,
641			closed: Arc::new(RwLock::new(false)),
642			subprotocol: None,
643			last_activity: Arc::new(RwLock::new(Instant::now())),
644			config,
645		}
646	}
647
648	/// Creates a new WebSocket connection with the given ID, sender, and subprotocol.
649	///
650	/// # Examples
651	///
652	/// ```
653	/// use reinhardt_websockets::{WebSocketConnection, Message};
654	/// use tokio::sync::mpsc;
655	///
656	/// let (tx, _rx) = mpsc::unbounded_channel();
657	/// let conn = WebSocketConnection::with_subprotocol(
658	///     "connection_1".to_string(),
659	///     tx,
660	///     Some("chat".to_string())
661	/// );
662	/// assert_eq!(conn.id(), "connection_1");
663	/// assert_eq!(conn.subprotocol(), Some("chat"));
664	/// ```
665	pub fn with_subprotocol(
666		id: String,
667		tx: mpsc::UnboundedSender<Message>,
668		subprotocol: Option<String>,
669	) -> Self {
670		Self {
671			id,
672			tx,
673			closed: Arc::new(RwLock::new(false)),
674			subprotocol,
675			last_activity: Arc::new(RwLock::new(Instant::now())),
676			config: ConnectionConfig::default(),
677		}
678	}
679
680	/// Gets the negotiated subprotocol, if any.
681	///
682	/// # Examples
683	///
684	/// ```
685	/// use reinhardt_websockets::{WebSocketConnection, Message};
686	/// use tokio::sync::mpsc;
687	///
688	/// let (tx, _rx) = mpsc::unbounded_channel();
689	/// let conn = WebSocketConnection::with_subprotocol(
690	///     "test".to_string(),
691	///     tx,
692	///     Some("chat".to_string())
693	/// );
694	/// assert_eq!(conn.subprotocol(), Some("chat"));
695	/// ```
696	pub fn subprotocol(&self) -> Option<&str> {
697		self.subprotocol.as_deref()
698	}
699
700	/// Gets the connection ID.
701	///
702	/// # Examples
703	///
704	/// ```
705	/// use reinhardt_websockets::{WebSocketConnection, Message};
706	/// use tokio::sync::mpsc;
707	///
708	/// let (tx, _rx) = mpsc::unbounded_channel();
709	/// let conn = WebSocketConnection::new("test_id".to_string(), tx);
710	/// assert_eq!(conn.id(), "test_id");
711	/// ```
712	pub fn id(&self) -> &str {
713		&self.id
714	}
715
716	/// Gets the connection timeout configuration.
717	pub fn config(&self) -> &ConnectionConfig {
718		&self.config
719	}
720
721	/// Records activity on the connection, resetting the idle timer.
722	///
723	/// This is called automatically when sending messages, but can also be called
724	/// manually to indicate that the connection is still active (e.g., when
725	/// receiving messages from the client).
726	///
727	/// # Examples
728	///
729	/// ```
730	/// use reinhardt_websockets::WebSocketConnection;
731	/// use tokio::sync::mpsc;
732	///
733	/// # tokio_test::block_on(async {
734	/// let (tx, _rx) = mpsc::unbounded_channel();
735	/// let conn = WebSocketConnection::new("test".to_string(), tx);
736	///
737	/// conn.record_activity().await;
738	/// assert!(!conn.is_idle().await);
739	/// # });
740	/// ```
741	pub async fn record_activity(&self) {
742		*self.last_activity.write().await = Instant::now();
743	}
744
745	/// Returns the duration since the last activity on this connection.
746	///
747	/// # Examples
748	///
749	/// ```
750	/// use reinhardt_websockets::WebSocketConnection;
751	/// use tokio::sync::mpsc;
752	/// use std::time::Duration;
753	///
754	/// # tokio_test::block_on(async {
755	/// let (tx, _rx) = mpsc::unbounded_channel();
756	/// let conn = WebSocketConnection::new("test".to_string(), tx);
757	///
758	/// let idle = conn.idle_duration().await;
759	/// assert!(idle < Duration::from_secs(1));
760	/// # });
761	/// ```
762	pub async fn idle_duration(&self) -> Duration {
763		self.last_activity.read().await.elapsed()
764	}
765
766	/// Checks whether this connection has exceeded its idle timeout.
767	///
768	/// # Examples
769	///
770	/// ```
771	/// use reinhardt_websockets::WebSocketConnection;
772	/// use tokio::sync::mpsc;
773	///
774	/// # tokio_test::block_on(async {
775	/// let (tx, _rx) = mpsc::unbounded_channel();
776	/// let conn = WebSocketConnection::new("test".to_string(), tx);
777	///
778	/// // A freshly created connection is not idle
779	/// assert!(!conn.is_idle().await);
780	/// # });
781	/// ```
782	pub async fn is_idle(&self) -> bool {
783		self.idle_duration().await > self.config.idle_timeout
784	}
785
786	/// Sends a message through the WebSocket connection.
787	///
788	/// Records activity on the connection when a message is sent successfully.
789	///
790	/// # Examples
791	///
792	/// ```
793	/// use reinhardt_websockets::{WebSocketConnection, Message};
794	/// use tokio::sync::mpsc;
795	///
796	/// # tokio_test::block_on(async {
797	/// let (tx, mut rx) = mpsc::unbounded_channel();
798	/// let conn = WebSocketConnection::new("test".to_string(), tx);
799	///
800	/// let message = Message::text("Hello".to_string());
801	/// conn.send(message).await.unwrap();
802	///
803	/// let received = rx.recv().await.unwrap();
804	/// assert!(matches!(received, Message::Text { .. }));
805	/// # });
806	/// ```
807	pub async fn send(&self, message: Message) -> WebSocketResult<()> {
808		if *self.closed.read().await {
809			return Err(WebSocketError::Send("Connection closed".to_string()));
810		}
811
812		let result = self
813			.tx
814			.send(message)
815			.map_err(|e| WebSocketError::Send(e.to_string()));
816
817		if result.is_ok() {
818			self.record_activity().await;
819		}
820
821		result
822	}
823	/// Sends a text message through the WebSocket connection.
824	///
825	/// # Examples
826	///
827	/// ```
828	/// use reinhardt_websockets::{WebSocketConnection, Message};
829	/// use tokio::sync::mpsc;
830	///
831	/// # tokio_test::block_on(async {
832	/// let (tx, mut rx) = mpsc::unbounded_channel();
833	/// let conn = WebSocketConnection::new("test".to_string(), tx);
834	///
835	/// conn.send_text("Hello World".to_string()).await.unwrap();
836	///
837	/// let received = rx.recv().await.unwrap();
838	/// match received {
839	///     Message::Text { data } => assert_eq!(data, "Hello World"),
840	///     _ => panic!("Expected text message"),
841	/// }
842	/// # });
843	/// ```
844	pub async fn send_text(&self, text: String) -> WebSocketResult<()> {
845		self.send(Message::text(text)).await
846	}
847	/// Sends a binary message through the WebSocket connection.
848	///
849	/// # Examples
850	///
851	/// ```
852	/// use reinhardt_websockets::{WebSocketConnection, Message};
853	/// use tokio::sync::mpsc;
854	///
855	/// # tokio_test::block_on(async {
856	/// let (tx, mut rx) = mpsc::unbounded_channel();
857	/// let conn = WebSocketConnection::new("test".to_string(), tx);
858	///
859	/// let binary_data = vec![0x48, 0x65, 0x6c, 0x6c, 0x6f]; // "Hello"
860	/// conn.send_binary(binary_data.clone()).await.unwrap();
861	///
862	/// let received = rx.recv().await.unwrap();
863	/// match received {
864	///     Message::Binary { data } => assert_eq!(data, binary_data),
865	///     _ => panic!("Expected binary message"),
866	/// }
867	/// # });
868	/// ```
869	pub async fn send_binary(&self, data: Vec<u8>) -> WebSocketResult<()> {
870		self.send(Message::binary(data)).await
871	}
872	/// Sends a JSON message through the WebSocket connection.
873	///
874	/// # Examples
875	///
876	/// ```
877	/// use reinhardt_websockets::{WebSocketConnection, Message};
878	/// use tokio::sync::mpsc;
879	/// use serde::Serialize;
880	///
881	/// #[derive(Serialize)]
882	/// struct User {
883	///     name: String,
884	///     age: u32,
885	/// }
886	///
887	/// # tokio_test::block_on(async {
888	/// let (tx, mut rx) = mpsc::unbounded_channel();
889	/// let conn = WebSocketConnection::new("test".to_string(), tx);
890	///
891	/// let user = User { name: "Alice".to_string(), age: 30 };
892	/// conn.send_json(&user).await.unwrap();
893	///
894	/// let received = rx.recv().await.unwrap();
895	/// match received {
896	///     Message::Text { data } => assert!(data.contains("Alice")),
897	///     _ => panic!("Expected text message"),
898	/// }
899	/// # });
900	/// ```
901	pub async fn send_json<T: serde::Serialize>(&self, data: &T) -> WebSocketResult<()> {
902		let message = Message::json(data)?;
903		self.send(message).await
904	}
905	/// Closes the WebSocket connection.
906	///
907	/// The connection is always marked as closed regardless of whether the
908	/// close frame could be sent. This ensures resource cleanup even when
909	/// the underlying channel is already broken.
910	///
911	/// # Examples
912	///
913	/// ```
914	/// use reinhardt_websockets::{WebSocketConnection, Message};
915	/// use tokio::sync::mpsc;
916	///
917	/// # tokio_test::block_on(async {
918	/// let (tx, mut rx) = mpsc::unbounded_channel();
919	/// let conn = WebSocketConnection::new("test".to_string(), tx);
920	///
921	/// conn.close().await.unwrap();
922	/// assert!(conn.is_closed().await);
923	/// # });
924	/// ```
925	pub async fn close(&self) -> WebSocketResult<()> {
926		// Mark as closed first to prevent new sends
927		*self.closed.write().await = true;
928
929		// Best-effort send of close frame; connection is closed regardless
930		self.tx
931			.send(Message::Close {
932				code: 1000,
933				reason: "Normal closure".to_string(),
934			})
935			.map_err(|e| WebSocketError::Send(e.to_string()))
936	}
937	/// Closes the connection with a custom close code and reason.
938	///
939	/// The connection is always marked as closed regardless of whether the
940	/// close frame could be sent. This ensures resource cleanup even when
941	/// the underlying channel is already broken.
942	///
943	/// # Examples
944	///
945	/// ```
946	/// use reinhardt_websockets::{WebSocketConnection, Message};
947	/// use tokio::sync::mpsc;
948	///
949	/// # tokio_test::block_on(async {
950	/// let (tx, mut rx) = mpsc::unbounded_channel();
951	/// let conn = WebSocketConnection::new("test".to_string(), tx);
952	///
953	/// conn.close_with_reason(1001, "Idle timeout".to_string()).await.unwrap();
954	/// assert!(conn.is_closed().await);
955	///
956	/// let msg = rx.recv().await.unwrap();
957	/// match msg {
958	///     Message::Close { code, reason } => {
959	///         assert_eq!(code, 1001);
960	///         assert_eq!(reason, "Idle timeout");
961	///     },
962	///     _ => panic!("Expected close message"),
963	/// }
964	/// # });
965	/// ```
966	pub async fn close_with_reason(&self, code: u16, reason: String) -> WebSocketResult<()> {
967		// Mark as closed first to prevent new sends
968		*self.closed.write().await = true;
969
970		// Best-effort send of close frame; connection is closed regardless
971		self.tx
972			.send(Message::Close { code, reason })
973			.map_err(|e| WebSocketError::Send(e.to_string()))
974	}
975
976	/// Forces the connection closed without sending a close frame.
977	///
978	/// Use this for abnormal close paths where the underlying transport is
979	/// already broken and sending a close frame would fail.
980	///
981	/// # Examples
982	///
983	/// ```
984	/// use reinhardt_websockets::WebSocketConnection;
985	/// use tokio::sync::mpsc;
986	///
987	/// # tokio_test::block_on(async {
988	/// let (tx, _rx) = mpsc::unbounded_channel();
989	/// let conn = WebSocketConnection::new("test".to_string(), tx);
990	///
991	/// conn.force_close().await;
992	/// assert!(conn.is_closed().await);
993	/// # });
994	/// ```
995	pub async fn force_close(&self) {
996		*self.closed.write().await = true;
997	}
998
999	/// Checks if the WebSocket connection is closed.
1000	///
1001	/// # Examples
1002	///
1003	/// ```
1004	/// use reinhardt_websockets::{WebSocketConnection, Message};
1005	/// use tokio::sync::mpsc;
1006	///
1007	/// # tokio_test::block_on(async {
1008	/// let (tx, _rx) = mpsc::unbounded_channel();
1009	/// let conn = WebSocketConnection::new("test".to_string(), tx);
1010	///
1011	/// assert!(!conn.is_closed().await);
1012	/// # });
1013	/// ```
1014	pub async fn is_closed(&self) -> bool {
1015		*self.closed.read().await
1016	}
1017}
1018
1019/// Monitors WebSocket connections for idle timeouts and cleans them up.
1020///
1021/// The monitor periodically checks all registered connections and closes
1022/// those that have exceeded their idle timeout. This prevents resource
1023/// exhaustion from idle connection holding attacks.
1024///
1025/// # Examples
1026///
1027/// ```
1028/// use reinhardt_websockets::connection::{ConnectionConfig, ConnectionTimeoutMonitor};
1029/// use reinhardt_websockets::WebSocketConnection;
1030/// use tokio::sync::mpsc;
1031/// use std::sync::Arc;
1032/// use std::time::Duration;
1033///
1034/// # tokio_test::block_on(async {
1035/// let config = ConnectionConfig::new()
1036///     .with_idle_timeout(Duration::from_secs(60))
1037///     .with_cleanup_interval(Duration::from_secs(10));
1038///
1039/// let monitor = ConnectionTimeoutMonitor::new(config);
1040///
1041/// let (tx, _rx) = mpsc::unbounded_channel();
1042/// let conn = Arc::new(WebSocketConnection::new("conn_1".to_string(), tx));
1043/// monitor.register(conn).await.unwrap();
1044///
1045/// assert_eq!(monitor.connection_count().await, 1);
1046/// # });
1047/// ```
1048pub struct ConnectionTimeoutMonitor {
1049	connections: Arc<RwLock<HashMap<String, Arc<WebSocketConnection>>>>,
1050	config: ConnectionConfig,
1051}
1052
1053impl ConnectionTimeoutMonitor {
1054	/// Creates a new connection timeout monitor with the given configuration.
1055	pub fn new(config: ConnectionConfig) -> Self {
1056		Self {
1057			connections: Arc::new(RwLock::new(HashMap::new())),
1058			config,
1059		}
1060	}
1061
1062	/// Registers a connection for timeout monitoring.
1063	///
1064	/// Returns `Ok(())` if the connection was registered, or `Err` if the
1065	/// maximum connection limit has been reached.
1066	pub async fn register(
1067		&self,
1068		connection: Arc<WebSocketConnection>,
1069	) -> Result<(), WebSocketError> {
1070		let mut connections = self.connections.write().await;
1071
1072		if let Some(max) = self.config.max_connections
1073			&& connections.len() >= max
1074		{
1075			return Err(WebSocketError::Connection(format!(
1076				"maximum connection limit reached ({})",
1077				max
1078			)));
1079		}
1080
1081		connections.insert(connection.id().to_string(), connection);
1082		Ok(())
1083	}
1084
1085	/// Unregisters a connection from timeout monitoring.
1086	pub async fn unregister(&self, connection_id: &str) {
1087		self.connections.write().await.remove(connection_id);
1088	}
1089
1090	/// Returns the number of currently monitored connections.
1091	pub async fn connection_count(&self) -> usize {
1092		self.connections.read().await.len()
1093	}
1094
1095	/// Checks all connections and closes those that have exceeded their idle timeout.
1096	///
1097	/// Returns the IDs of connections that were closed due to timeout.
1098	pub async fn check_idle_connections(&self) -> Vec<String> {
1099		let connections = self.connections.read().await;
1100		let mut timed_out = Vec::new();
1101
1102		for (id, conn) in connections.iter() {
1103			if conn.is_closed().await {
1104				timed_out.push(id.clone());
1105				continue;
1106			}
1107
1108			let idle_duration = conn.idle_duration().await;
1109			if idle_duration > self.config.idle_timeout {
1110				let reason = format!(
1111					"Idle timeout: connection idle for {}s (limit: {}s)",
1112					idle_duration.as_secs(),
1113					self.config.idle_timeout.as_secs()
1114				);
1115				// Close with 1001 (Going Away) as per RFC 6455
1116				let _ = conn.close_with_reason(1001, reason).await;
1117				timed_out.push(id.clone());
1118			}
1119		}
1120
1121		drop(connections);
1122
1123		// Remove timed-out connections
1124		if !timed_out.is_empty() {
1125			let mut connections = self.connections.write().await;
1126			for id in &timed_out {
1127				connections.remove(id);
1128			}
1129		}
1130
1131		timed_out
1132	}
1133
1134	/// Gracefully shuts down all monitored connections.
1135	///
1136	/// Sends a Close frame (code 1001, "Going Away") to each active connection
1137	/// and removes it from monitoring. Already-closed connections are silently
1138	/// removed.
1139	///
1140	/// Returns the IDs of all connections that were shut down.
1141	pub async fn shutdown_all(&self) -> Vec<String> {
1142		let mut connections = self.connections.write().await;
1143		let mut shut_down = Vec::with_capacity(connections.len());
1144
1145		for (id, conn) in connections.drain() {
1146			if !conn.is_closed().await {
1147				let _ = conn
1148					.close_with_reason(1001, "Server shutting down".to_string())
1149					.await;
1150			}
1151			shut_down.push(id);
1152		}
1153
1154		shut_down
1155	}
1156
1157	/// Starts the background monitoring task.
1158	///
1159	/// Returns a [`tokio::task::JoinHandle`] that can be used to abort the monitor.
1160	/// The monitor runs until the handle is aborted or the process exits.
1161	///
1162	/// # Examples
1163	///
1164	/// ```
1165	/// use reinhardt_websockets::connection::{ConnectionConfig, ConnectionTimeoutMonitor};
1166	/// use std::time::Duration;
1167	///
1168	/// # tokio_test::block_on(async {
1169	/// let config = ConnectionConfig::new()
1170	///     .with_cleanup_interval(Duration::from_millis(100));
1171	/// let monitor = std::sync::Arc::new(ConnectionTimeoutMonitor::new(config));
1172	///
1173	/// let handle = monitor.start();
1174	///
1175	/// // Monitor is running in background...
1176	/// tokio::time::sleep(Duration::from_millis(50)).await;
1177	///
1178	/// // Stop the monitor
1179	/// handle.abort();
1180	/// # });
1181	/// ```
1182	pub fn start(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
1183		let monitor = Arc::clone(self);
1184		tokio::spawn(async move {
1185			let mut interval = tokio::time::interval(monitor.config.cleanup_interval);
1186			loop {
1187				interval.tick().await;
1188				monitor.check_idle_connections().await;
1189			}
1190		})
1191	}
1192}
1193
1194/// Configuration for heartbeat (ping/pong) monitoring.
1195///
1196/// Defines how often pings are sent and how long to wait for a pong
1197/// response before considering the connection dead.
1198///
1199/// # Examples
1200///
1201/// ```
1202/// use reinhardt_websockets::connection::HeartbeatConfig;
1203/// use std::time::Duration;
1204///
1205/// let config = HeartbeatConfig::new(
1206///     Duration::from_secs(30),
1207///     Duration::from_secs(10),
1208/// );
1209///
1210/// assert_eq!(config.ping_interval(), Duration::from_secs(30));
1211/// assert_eq!(config.pong_timeout(), Duration::from_secs(10));
1212/// ```
1213#[derive(Debug, Clone)]
1214pub struct HeartbeatConfig {
1215	/// Interval between outgoing pings
1216	ping_interval: Duration,
1217	/// Maximum time to wait for a pong response before closing
1218	pong_timeout: Duration,
1219}
1220
1221impl HeartbeatConfig {
1222	/// Creates a new heartbeat configuration.
1223	pub fn new(ping_interval: Duration, pong_timeout: Duration) -> Self {
1224		Self {
1225			ping_interval,
1226			pong_timeout,
1227		}
1228	}
1229
1230	/// Returns the ping interval.
1231	pub fn ping_interval(&self) -> Duration {
1232		self.ping_interval
1233	}
1234
1235	/// Returns the pong timeout.
1236	pub fn pong_timeout(&self) -> Duration {
1237		self.pong_timeout
1238	}
1239}
1240
1241impl Default for HeartbeatConfig {
1242	fn default() -> Self {
1243		Self {
1244			ping_interval: Duration::from_secs(30),
1245			pong_timeout: Duration::from_secs(10),
1246		}
1247	}
1248}
1249
1250/// Monitors a WebSocket connection's heartbeat via ping/pong.
1251///
1252/// Sends periodic pings and tracks when the last pong was received.
1253/// When no pong arrives within the configured timeout, the connection
1254/// is force-closed and the monitor signals a heartbeat failure.
1255///
1256/// # Examples
1257///
1258/// ```
1259/// use reinhardt_websockets::connection::{HeartbeatConfig, HeartbeatMonitor};
1260/// use reinhardt_websockets::WebSocketConnection;
1261/// use tokio::sync::mpsc;
1262/// use std::sync::Arc;
1263/// use std::time::Duration;
1264///
1265/// # tokio_test::block_on(async {
1266/// let (tx, _rx) = mpsc::unbounded_channel();
1267/// let conn = Arc::new(WebSocketConnection::new("hb_test".to_string(), tx));
1268/// let config = HeartbeatConfig::default();
1269///
1270/// let monitor = HeartbeatMonitor::new(conn, config);
1271/// assert!(!monitor.is_timed_out().await);
1272/// # });
1273/// ```
1274pub struct HeartbeatMonitor {
1275	connection: Arc<WebSocketConnection>,
1276	config: HeartbeatConfig,
1277	last_pong: Arc<RwLock<Instant>>,
1278	timed_out: Arc<RwLock<bool>>,
1279}
1280
1281impl HeartbeatMonitor {
1282	/// Creates a new heartbeat monitor for the given connection.
1283	pub fn new(connection: Arc<WebSocketConnection>, config: HeartbeatConfig) -> Self {
1284		Self {
1285			connection,
1286			config,
1287			last_pong: Arc::new(RwLock::new(Instant::now())),
1288			timed_out: Arc::new(RwLock::new(false)),
1289		}
1290	}
1291
1292	/// Records a pong response, resetting the timeout tracker.
1293	pub async fn record_pong(&self) {
1294		*self.last_pong.write().await = Instant::now();
1295	}
1296
1297	/// Returns the duration since the last pong was received.
1298	pub async fn time_since_last_pong(&self) -> Duration {
1299		self.last_pong.read().await.elapsed()
1300	}
1301
1302	/// Returns whether the heartbeat has timed out.
1303	pub async fn is_timed_out(&self) -> bool {
1304		*self.timed_out.read().await
1305	}
1306
1307	/// Checks whether the pong timeout has been exceeded.
1308	///
1309	/// If the timeout is exceeded, the connection is force-closed and
1310	/// the method returns `true`.
1311	pub async fn check_heartbeat(&self) -> bool {
1312		let since_pong = self.time_since_last_pong().await;
1313
1314		if since_pong > self.config.pong_timeout {
1315			self.connection.force_close().await;
1316			*self.timed_out.write().await = true;
1317			return true;
1318		}
1319
1320		false
1321	}
1322
1323	/// Sends a ping message through the connection.
1324	///
1325	/// Returns `Ok(())` if the ping was sent, or an error if the
1326	/// connection is already closed.
1327	pub async fn send_ping(&self) -> WebSocketResult<()> {
1328		self.connection.send(Message::Ping).await
1329	}
1330
1331	/// Returns a reference to the heartbeat configuration.
1332	pub fn config(&self) -> &HeartbeatConfig {
1333		&self.config
1334	}
1335
1336	/// Returns a reference to the monitored connection.
1337	pub fn connection(&self) -> &Arc<WebSocketConnection> {
1338		&self.connection
1339	}
1340
1341	/// Starts a background task that periodically sends pings and checks
1342	/// for pong timeouts.
1343	///
1344	/// Returns a [`tokio::task::JoinHandle`] that can be aborted to stop
1345	/// the monitor. The task ends automatically when a heartbeat timeout
1346	/// occurs.
1347	pub fn start(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
1348		let monitor = Arc::clone(self);
1349		tokio::spawn(async move {
1350			let mut interval = tokio::time::interval(monitor.config.ping_interval);
1351			loop {
1352				interval.tick().await;
1353
1354				if monitor.connection.is_closed().await {
1355					break;
1356				}
1357
1358				// Best-effort ping; if send fails, check_heartbeat will catch it
1359				let _ = monitor.send_ping().await;
1360
1361				// Wait for pong timeout period, then check
1362				tokio::time::sleep(monitor.config.pong_timeout).await;
1363
1364				if monitor.check_heartbeat().await {
1365					break;
1366				}
1367			}
1368		})
1369	}
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374	use super::*;
1375	use rstest::rstest;
1376
1377	#[rstest]
1378	fn test_message_text() {
1379		// Arrange
1380		let text = "Hello".to_string();
1381
1382		// Act
1383		let msg = Message::text(text);
1384
1385		// Assert
1386		match msg {
1387			Message::Text { data } => assert_eq!(data, "Hello"),
1388			_ => panic!("Expected text message"),
1389		}
1390	}
1391
1392	#[rstest]
1393	fn test_message_json() {
1394		// Arrange
1395		#[derive(serde::Serialize)]
1396		struct TestData {
1397			value: i32,
1398		}
1399		let data = TestData { value: 42 };
1400
1401		// Act
1402		let msg = Message::json(&data).unwrap();
1403
1404		// Assert
1405		match msg {
1406			Message::Text { data } => assert!(data.contains("42")),
1407			_ => panic!("Expected text message"),
1408		}
1409	}
1410
1411	#[rstest]
1412	#[tokio::test]
1413	async fn test_connection_send() {
1414		// Arrange
1415		let (tx, mut rx) = mpsc::unbounded_channel();
1416		let conn = WebSocketConnection::new("test".to_string(), tx);
1417
1418		// Act
1419		conn.send_text("Hello".to_string()).await.unwrap();
1420
1421		// Assert
1422		let received = rx.recv().await.unwrap();
1423		match received {
1424			Message::Text { data } => assert_eq!(data, "Hello"),
1425			_ => panic!("Expected text message"),
1426		}
1427	}
1428
1429	#[rstest]
1430	fn test_connection_config_default() {
1431		// Arrange & Act
1432		let config = ConnectionConfig::new();
1433
1434		// Assert
1435		assert_eq!(config.idle_timeout(), Duration::from_secs(300));
1436		assert_eq!(config.handshake_timeout(), Duration::from_secs(10));
1437		assert_eq!(config.cleanup_interval(), Duration::from_secs(30));
1438	}
1439
1440	#[rstest]
1441	fn test_connection_config_strict() {
1442		// Arrange & Act
1443		let config = ConnectionConfig::strict();
1444
1445		// Assert
1446		assert_eq!(config.idle_timeout(), Duration::from_secs(30));
1447		assert_eq!(config.handshake_timeout(), Duration::from_secs(5));
1448		assert_eq!(config.cleanup_interval(), Duration::from_secs(10));
1449	}
1450
1451	#[rstest]
1452	fn test_connection_config_permissive() {
1453		// Arrange & Act
1454		let config = ConnectionConfig::permissive();
1455
1456		// Assert
1457		assert_eq!(config.idle_timeout(), Duration::from_secs(3600));
1458		assert_eq!(config.handshake_timeout(), Duration::from_secs(30));
1459		assert_eq!(config.cleanup_interval(), Duration::from_secs(60));
1460	}
1461
1462	#[rstest]
1463	fn test_connection_config_no_timeout() {
1464		// Arrange & Act
1465		let config = ConnectionConfig::no_timeout();
1466
1467		// Assert
1468		assert_eq!(config.idle_timeout(), Duration::MAX);
1469		assert_eq!(config.handshake_timeout(), Duration::MAX);
1470	}
1471
1472	#[rstest]
1473	fn test_connection_config_builder() {
1474		// Arrange & Act
1475		let config = ConnectionConfig::new()
1476			.with_idle_timeout(Duration::from_secs(120))
1477			.with_handshake_timeout(Duration::from_secs(15))
1478			.with_cleanup_interval(Duration::from_secs(20));
1479
1480		// Assert
1481		assert_eq!(config.idle_timeout(), Duration::from_secs(120));
1482		assert_eq!(config.handshake_timeout(), Duration::from_secs(15));
1483		assert_eq!(config.cleanup_interval(), Duration::from_secs(20));
1484	}
1485
1486	#[rstest]
1487	#[tokio::test]
1488	async fn test_connection_with_config() {
1489		// Arrange
1490		let config = ConnectionConfig::new().with_idle_timeout(Duration::from_secs(60));
1491		let (tx, _rx) = mpsc::unbounded_channel();
1492
1493		// Act
1494		let conn = WebSocketConnection::with_config("test".to_string(), tx, config);
1495
1496		// Assert
1497		assert_eq!(conn.config().idle_timeout(), Duration::from_secs(60));
1498		assert!(!conn.is_idle().await);
1499	}
1500
1501	#[rstest]
1502	#[tokio::test]
1503	async fn test_connection_record_activity_resets_idle() {
1504		// Arrange
1505		let config = ConnectionConfig::new().with_idle_timeout(Duration::from_millis(50));
1506		let (tx, _rx) = mpsc::unbounded_channel();
1507		let conn = WebSocketConnection::with_config("test".to_string(), tx, config);
1508
1509		// Act - wait for connection to become idle
1510		tokio::time::sleep(Duration::from_millis(60)).await;
1511		assert!(conn.is_idle().await);
1512
1513		// Act - record activity to reset idle timer
1514		conn.record_activity().await;
1515
1516		// Assert - connection should no longer be idle
1517		assert!(!conn.is_idle().await);
1518	}
1519
1520	#[rstest]
1521	#[tokio::test]
1522	async fn test_connection_becomes_idle_after_timeout() {
1523		// Arrange
1524		let config = ConnectionConfig::new().with_idle_timeout(Duration::from_millis(50));
1525		let (tx, _rx) = mpsc::unbounded_channel();
1526		let conn = WebSocketConnection::with_config("test".to_string(), tx, config);
1527
1528		// Act - wait for connection to exceed idle timeout
1529		tokio::time::sleep(Duration::from_millis(60)).await;
1530
1531		// Assert
1532		assert!(conn.is_idle().await);
1533		assert!(conn.idle_duration().await >= Duration::from_millis(50));
1534	}
1535
1536	#[rstest]
1537	#[tokio::test]
1538	async fn test_send_resets_activity() {
1539		// Arrange
1540		let config = ConnectionConfig::new().with_idle_timeout(Duration::from_millis(100));
1541		let (tx, mut _rx) = mpsc::unbounded_channel();
1542		let conn = WebSocketConnection::with_config("test".to_string(), tx, config);
1543
1544		// Act - wait a bit then send
1545		tokio::time::sleep(Duration::from_millis(50)).await;
1546		conn.send_text("ping".to_string()).await.unwrap();
1547
1548		// Assert - activity should be recent
1549		assert!(conn.idle_duration().await < Duration::from_millis(30));
1550		assert!(!conn.is_idle().await);
1551	}
1552
1553	#[rstest]
1554	#[tokio::test]
1555	async fn test_close_with_reason() {
1556		// Arrange
1557		let (tx, mut rx) = mpsc::unbounded_channel();
1558		let conn = WebSocketConnection::new("test".to_string(), tx);
1559
1560		// Act
1561		conn.close_with_reason(1001, "Idle timeout".to_string())
1562			.await
1563			.unwrap();
1564
1565		// Assert
1566		assert!(conn.is_closed().await);
1567		let msg = rx.recv().await.unwrap();
1568		match msg {
1569			Message::Close { code, reason } => {
1570				assert_eq!(code, 1001);
1571				assert_eq!(reason, "Idle timeout");
1572			}
1573			_ => panic!("Expected close message"),
1574		}
1575	}
1576
1577	#[rstest]
1578	#[tokio::test]
1579	async fn test_timeout_monitor_register_and_count() {
1580		// Arrange
1581		let config = ConnectionConfig::new();
1582		let monitor = ConnectionTimeoutMonitor::new(config);
1583		let (tx, _rx) = mpsc::unbounded_channel();
1584		let conn = Arc::new(WebSocketConnection::new("conn_1".to_string(), tx));
1585
1586		// Act
1587		monitor.register(conn).await.unwrap();
1588
1589		// Assert
1590		assert_eq!(monitor.connection_count().await, 1);
1591	}
1592
1593	#[rstest]
1594	#[tokio::test]
1595	async fn test_timeout_monitor_unregister() {
1596		// Arrange
1597		let config = ConnectionConfig::new();
1598		let monitor = ConnectionTimeoutMonitor::new(config);
1599		let (tx, _rx) = mpsc::unbounded_channel();
1600		let conn = Arc::new(WebSocketConnection::new("conn_1".to_string(), tx));
1601		monitor.register(conn).await.unwrap();
1602
1603		// Act
1604		monitor.unregister("conn_1").await;
1605
1606		// Assert
1607		assert_eq!(monitor.connection_count().await, 0);
1608	}
1609
1610	#[rstest]
1611	#[tokio::test]
1612	async fn test_timeout_monitor_closes_idle_connections() {
1613		// Arrange
1614		let config = ConnectionConfig::new().with_idle_timeout(Duration::from_millis(50));
1615		let monitor = ConnectionTimeoutMonitor::new(config);
1616
1617		let (tx1, mut rx1) = mpsc::unbounded_channel();
1618		let conn1 = Arc::new(WebSocketConnection::with_config(
1619			"idle_conn".to_string(),
1620			tx1,
1621			ConnectionConfig::new().with_idle_timeout(Duration::from_millis(50)),
1622		));
1623
1624		let (tx2, _rx2) = mpsc::unbounded_channel();
1625		let conn2 = Arc::new(WebSocketConnection::with_config(
1626			"active_conn".to_string(),
1627			tx2,
1628			ConnectionConfig::new().with_idle_timeout(Duration::from_secs(300)),
1629		));
1630
1631		monitor.register(conn1).await.unwrap();
1632		monitor.register(conn2.clone()).await.unwrap();
1633
1634		// Act - wait for idle timeout to expire
1635		tokio::time::sleep(Duration::from_millis(60)).await;
1636		// Keep active connection alive
1637		conn2.record_activity().await;
1638
1639		let timed_out = monitor.check_idle_connections().await;
1640
1641		// Assert
1642		assert_eq!(timed_out.len(), 1);
1643		assert_eq!(timed_out[0], "idle_conn");
1644		assert_eq!(monitor.connection_count().await, 1);
1645
1646		// Verify the idle connection received a close message
1647		let msg = rx1.recv().await.unwrap();
1648		match msg {
1649			Message::Close { code, reason } => {
1650				assert_eq!(code, 1001);
1651				assert!(reason.contains("Idle timeout"));
1652			}
1653			_ => panic!("Expected close message for idle connection"),
1654		}
1655	}
1656
1657	#[rstest]
1658	#[tokio::test]
1659	async fn test_timeout_monitor_removes_already_closed_connections() {
1660		// Arrange
1661		let config = ConnectionConfig::new();
1662		let monitor = ConnectionTimeoutMonitor::new(config);
1663		let (tx, _rx) = mpsc::unbounded_channel();
1664		let conn = Arc::new(WebSocketConnection::new("conn_1".to_string(), tx));
1665		conn.close().await.unwrap();
1666		monitor.register(conn).await.unwrap();
1667
1668		// Act
1669		let timed_out = monitor.check_idle_connections().await;
1670
1671		// Assert
1672		assert_eq!(timed_out.len(), 1);
1673		assert_eq!(timed_out[0], "conn_1");
1674		assert_eq!(monitor.connection_count().await, 0);
1675	}
1676
1677	#[rstest]
1678	#[tokio::test]
1679	async fn test_timeout_monitor_background_task() {
1680		// Arrange
1681		let config = ConnectionConfig::new()
1682			.with_idle_timeout(Duration::from_millis(30))
1683			.with_cleanup_interval(Duration::from_millis(20));
1684		let monitor = Arc::new(ConnectionTimeoutMonitor::new(config));
1685
1686		let (tx, mut rx) = mpsc::unbounded_channel();
1687		let conn = Arc::new(WebSocketConnection::with_config(
1688			"bg_conn".to_string(),
1689			tx,
1690			ConnectionConfig::new().with_idle_timeout(Duration::from_millis(30)),
1691		));
1692		monitor.register(conn).await.unwrap();
1693
1694		// Act - start background monitor
1695		let handle = monitor.start();
1696
1697		// Wait for the monitor to detect and close the idle connection
1698		tokio::time::sleep(Duration::from_millis(120)).await;
1699
1700		// Assert
1701		assert_eq!(monitor.connection_count().await, 0);
1702
1703		// Verify close message was sent
1704		let msg = rx.recv().await.unwrap();
1705		assert!(matches!(msg, Message::Close { .. }));
1706
1707		// Cleanup
1708		handle.abort();
1709	}
1710
1711	#[rstest]
1712	fn test_ping_pong_config_default() {
1713		// Arrange & Act
1714		let config = PingPongConfig::default();
1715
1716		// Assert
1717		assert_eq!(config.ping_interval(), Duration::from_secs(30));
1718		assert_eq!(config.pong_timeout(), Duration::from_secs(10));
1719	}
1720
1721	#[rstest]
1722	fn test_ping_pong_config_custom() {
1723		// Arrange & Act
1724		let config = PingPongConfig::new(Duration::from_secs(15), Duration::from_secs(5));
1725
1726		// Assert
1727		assert_eq!(config.ping_interval(), Duration::from_secs(15));
1728		assert_eq!(config.pong_timeout(), Duration::from_secs(5));
1729	}
1730
1731	#[rstest]
1732	fn test_ping_pong_config_builder() {
1733		// Arrange & Act
1734		let config = PingPongConfig::default()
1735			.with_ping_interval(Duration::from_secs(60))
1736			.with_pong_timeout(Duration::from_secs(20));
1737
1738		// Assert
1739		assert_eq!(config.ping_interval(), Duration::from_secs(60));
1740		assert_eq!(config.pong_timeout(), Duration::from_secs(20));
1741	}
1742
1743	#[rstest]
1744	fn test_connection_config_has_default_ping_config() {
1745		// Arrange & Act
1746		let config = ConnectionConfig::new();
1747
1748		// Assert
1749		assert_eq!(
1750			config.ping_config().ping_interval(),
1751			Duration::from_secs(30)
1752		);
1753		assert_eq!(config.ping_config().pong_timeout(), Duration::from_secs(10));
1754	}
1755
1756	#[rstest]
1757	fn test_connection_config_with_custom_ping_config() {
1758		// Arrange
1759		let ping_config = PingPongConfig::new(Duration::from_secs(15), Duration::from_secs(5));
1760
1761		// Act
1762		let config = ConnectionConfig::new().with_ping_config(ping_config);
1763
1764		// Assert
1765		assert_eq!(
1766			config.ping_config().ping_interval(),
1767			Duration::from_secs(15)
1768		);
1769		assert_eq!(config.ping_config().pong_timeout(), Duration::from_secs(5));
1770	}
1771
1772	#[rstest]
1773	fn test_strict_config_has_aggressive_ping() {
1774		// Arrange & Act
1775		let config = ConnectionConfig::strict();
1776
1777		// Assert
1778		assert_eq!(
1779			config.ping_config().ping_interval(),
1780			Duration::from_secs(10)
1781		);
1782		assert_eq!(config.ping_config().pong_timeout(), Duration::from_secs(5));
1783	}
1784
1785	#[rstest]
1786	fn test_permissive_config_has_relaxed_ping() {
1787		// Arrange & Act
1788		let config = ConnectionConfig::permissive();
1789
1790		// Assert
1791		assert_eq!(
1792			config.ping_config().ping_interval(),
1793			Duration::from_secs(60)
1794		);
1795		assert_eq!(config.ping_config().pong_timeout(), Duration::from_secs(30));
1796	}
1797
1798	#[rstest]
1799	#[tokio::test]
1800	async fn test_timeout_monitor_rejects_when_max_connections_reached() {
1801		// Arrange
1802		let config = ConnectionConfig::new().with_max_connections(Some(1));
1803		let monitor = ConnectionTimeoutMonitor::new(config);
1804
1805		let (tx1, _rx1) = mpsc::unbounded_channel();
1806		let conn1 = Arc::new(WebSocketConnection::new("conn_1".to_string(), tx1));
1807
1808		let (tx2, _rx2) = mpsc::unbounded_channel();
1809		let conn2 = Arc::new(WebSocketConnection::new("conn_2".to_string(), tx2));
1810
1811		// Act
1812		monitor.register(conn1).await.unwrap();
1813		let result = monitor.register(conn2).await;
1814
1815		// Assert
1816		assert!(result.is_err());
1817		assert_eq!(monitor.connection_count().await, 1);
1818	}
1819
1820	#[rstest]
1821	#[tokio::test]
1822	async fn test_force_close_marks_connection_closed() {
1823		// Arrange
1824		let (tx, _rx) = mpsc::unbounded_channel();
1825		let conn = WebSocketConnection::new("test".to_string(), tx);
1826
1827		// Act
1828		conn.force_close().await;
1829
1830		// Assert
1831		assert!(conn.is_closed().await);
1832	}
1833
1834	#[rstest]
1835	#[tokio::test]
1836	async fn test_close_marks_closed_even_when_channel_dropped() {
1837		// Arrange
1838		let (tx, rx) = mpsc::unbounded_channel();
1839		let conn = WebSocketConnection::new("test".to_string(), tx);
1840
1841		// Drop receiver to simulate broken channel
1842		drop(rx);
1843
1844		// Act - close should still mark the connection as closed
1845		let result = conn.close().await;
1846
1847		// Assert
1848		assert!(result.is_err()); // send fails because receiver is dropped
1849		assert!(conn.is_closed().await); // but connection is still marked closed
1850	}
1851
1852	#[rstest]
1853	#[tokio::test]
1854	async fn test_close_with_reason_marks_closed_even_when_channel_dropped() {
1855		// Arrange
1856		let (tx, rx) = mpsc::unbounded_channel();
1857		let conn = WebSocketConnection::new("test".to_string(), tx);
1858
1859		// Drop receiver to simulate broken channel
1860		drop(rx);
1861
1862		// Act
1863		let result = conn
1864			.close_with_reason(1006, "Abnormal close".to_string())
1865			.await;
1866
1867		// Assert
1868		assert!(result.is_err());
1869		assert!(conn.is_closed().await);
1870	}
1871
1872	#[rstest]
1873	#[tokio::test]
1874	async fn test_send_after_force_close_returns_error() {
1875		// Arrange
1876		let (tx, _rx) = mpsc::unbounded_channel();
1877		let conn = WebSocketConnection::new("test".to_string(), tx);
1878		conn.force_close().await;
1879
1880		// Act
1881		let result = conn.send_text("should fail".to_string()).await;
1882
1883		// Assert
1884		assert!(result.is_err());
1885		assert!(matches!(result.unwrap_err(), WebSocketError::Send(_)));
1886	}
1887
1888	#[rstest]
1889	fn test_heartbeat_config_default() {
1890		// Arrange & Act
1891		let config = HeartbeatConfig::default();
1892
1893		// Assert
1894		assert_eq!(config.ping_interval(), Duration::from_secs(30));
1895		assert_eq!(config.pong_timeout(), Duration::from_secs(10));
1896	}
1897
1898	#[rstest]
1899	fn test_heartbeat_config_custom() {
1900		// Arrange & Act
1901		let config = HeartbeatConfig::new(Duration::from_secs(15), Duration::from_secs(5));
1902
1903		// Assert
1904		assert_eq!(config.ping_interval(), Duration::from_secs(15));
1905		assert_eq!(config.pong_timeout(), Duration::from_secs(5));
1906	}
1907
1908	#[rstest]
1909	#[tokio::test]
1910	async fn test_heartbeat_monitor_initial_state() {
1911		// Arrange
1912		let (tx, _rx) = mpsc::unbounded_channel();
1913		let conn = Arc::new(WebSocketConnection::new("hb_test".to_string(), tx));
1914		let config = HeartbeatConfig::default();
1915
1916		// Act
1917		let monitor = HeartbeatMonitor::new(conn, config);
1918
1919		// Assert
1920		assert!(!monitor.is_timed_out().await);
1921		assert!(monitor.time_since_last_pong().await < Duration::from_secs(1));
1922	}
1923
1924	#[rstest]
1925	#[tokio::test]
1926	async fn test_heartbeat_monitor_record_pong_resets_timer() {
1927		// Arrange
1928		let (tx, _rx) = mpsc::unbounded_channel();
1929		let conn = Arc::new(WebSocketConnection::new("hb_pong".to_string(), tx));
1930		let config = HeartbeatConfig::new(Duration::from_millis(50), Duration::from_millis(30));
1931		let monitor = HeartbeatMonitor::new(conn, config);
1932
1933		// Act - wait then record pong
1934		tokio::time::sleep(Duration::from_millis(20)).await;
1935		monitor.record_pong().await;
1936
1937		// Assert
1938		assert!(monitor.time_since_last_pong().await < Duration::from_millis(10));
1939	}
1940
1941	#[rstest]
1942	#[tokio::test]
1943	async fn test_heartbeat_monitor_timeout_closes_connection() {
1944		// Arrange
1945		let (tx, _rx) = mpsc::unbounded_channel();
1946		let conn = Arc::new(WebSocketConnection::new("hb_timeout".to_string(), tx));
1947		let config = HeartbeatConfig::new(Duration::from_millis(50), Duration::from_millis(30));
1948		let monitor = HeartbeatMonitor::new(conn.clone(), config);
1949
1950		// Act - wait past the pong timeout
1951		tokio::time::sleep(Duration::from_millis(40)).await;
1952		let timed_out = monitor.check_heartbeat().await;
1953
1954		// Assert
1955		assert!(timed_out);
1956		assert!(monitor.is_timed_out().await);
1957		assert!(conn.is_closed().await);
1958	}
1959
1960	#[rstest]
1961	#[tokio::test]
1962	async fn test_heartbeat_monitor_no_timeout_when_pong_received() {
1963		// Arrange
1964		let (tx, _rx) = mpsc::unbounded_channel();
1965		let conn = Arc::new(WebSocketConnection::new("hb_ok".to_string(), tx));
1966		let config = HeartbeatConfig::new(Duration::from_millis(100), Duration::from_millis(50));
1967		let monitor = HeartbeatMonitor::new(conn.clone(), config);
1968
1969		// Act - record pong within timeout window
1970		tokio::time::sleep(Duration::from_millis(20)).await;
1971		monitor.record_pong().await;
1972		let timed_out = monitor.check_heartbeat().await;
1973
1974		// Assert
1975		assert!(!timed_out);
1976		assert!(!monitor.is_timed_out().await);
1977		assert!(!conn.is_closed().await);
1978	}
1979
1980	#[rstest]
1981	#[tokio::test]
1982	async fn test_heartbeat_monitor_send_ping() {
1983		// Arrange
1984		let (tx, mut rx) = mpsc::unbounded_channel();
1985		let conn = Arc::new(WebSocketConnection::new("hb_ping".to_string(), tx));
1986		let config = HeartbeatConfig::default();
1987		let monitor = HeartbeatMonitor::new(conn, config);
1988
1989		// Act
1990		monitor.send_ping().await.unwrap();
1991
1992		// Assert
1993		let msg = rx.recv().await.unwrap();
1994		assert!(matches!(msg, Message::Ping));
1995	}
1996
1997	#[rstest]
1998	fn test_websocket_error_binary_payload_variant() {
1999		// Arrange & Act
2000		let err = WebSocketError::BinaryPayload("invalid data".to_string());
2001
2002		// Assert
2003		assert_eq!(err.to_string(), "Invalid binary payload: invalid data");
2004	}
2005
2006	#[rstest]
2007	fn test_websocket_error_heartbeat_timeout_variant() {
2008		// Arrange & Act
2009		let err = WebSocketError::HeartbeatTimeout(Duration::from_secs(10));
2010
2011		// Assert
2012		assert_eq!(
2013			err.to_string(),
2014			"Heartbeat timeout: no pong received within 10s"
2015		);
2016	}
2017
2018	#[rstest]
2019	fn test_websocket_error_slow_consumer_variant() {
2020		// Arrange & Act
2021		let err = WebSocketError::SlowConsumer(Duration::from_secs(5));
2022
2023		// Assert
2024		assert_eq!(err.to_string(), "Slow consumer: send timed out after 5s");
2025	}
2026}