aerosocket_client/
connection.rs1use aerosocket_core::prelude::Bytes;
6use aerosocket_core::{Message, Result};
7use std::net::SocketAddr;
8
9#[derive(Debug)]
11pub struct ClientConnection {
12 remote_addr: SocketAddr,
14 state: ConnectionState,
16 metadata: ConnectionMetadata,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum ConnectionState {
23 Connecting,
25 Connected,
27 Closing,
29 Closed,
31}
32
33#[derive(Debug, Clone)]
35pub struct ConnectionMetadata {
36 pub subprotocol: Option<String>,
38 pub extensions: Vec<String>,
40 pub established_at: std::time::Instant,
42 pub last_activity_at: std::time::Instant,
44 pub messages_sent: u64,
46 pub messages_received: u64,
48 pub bytes_sent: u64,
50 pub bytes_received: u64,
52}
53
54impl ClientConnection {
55 pub fn new(remote_addr: SocketAddr) -> Self {
57 Self {
58 remote_addr,
59 state: ConnectionState::Connecting,
60 metadata: ConnectionMetadata {
61 subprotocol: None,
62 extensions: Vec::new(),
63 established_at: std::time::Instant::now(),
64 last_activity_at: std::time::Instant::now(),
65 messages_sent: 0,
66 messages_received: 0,
67 bytes_sent: 0,
68 bytes_received: 0,
69 },
70 }
71 }
72
73 pub fn remote_addr(&self) -> SocketAddr {
75 self.remote_addr
76 }
77
78 pub fn state(&self) -> ConnectionState {
80 self.state
81 }
82
83 pub fn metadata(&self) -> &ConnectionMetadata {
85 &self.metadata
86 }
87
88 pub async fn send(&mut self, message: Message) -> Result<()> {
90 self.metadata.messages_sent += 1;
92 let message_len = match &message {
93 Message::Text(text) => text.len(),
94 Message::Binary(data) => data.len(),
95 Message::Ping(data) => data.len(),
96 Message::Pong(data) => data.len(),
97 Message::Close(close_msg) => close_msg.len(),
98 };
99 self.metadata.bytes_sent += message_len as u64;
100 self.metadata.last_activity_at = std::time::Instant::now();
101 Ok(())
102 }
103
104 pub async fn send_text(&mut self, text: impl AsRef<str>) -> Result<()> {
106 let message = Message::text(text.as_ref().to_string());
107 self.send(message).await
108 }
109
110 pub async fn send_binary(&mut self, data: impl Into<Bytes>) -> Result<()> {
112 let message = Message::binary(data);
113 self.send(message).await
114 }
115
116 pub async fn ping(&mut self, data: Option<&[u8]>) -> Result<()> {
118 let message = Message::ping(data.map(|d| d.to_vec()));
119 self.send(message).await
120 }
121
122 pub async fn pong(&mut self, data: Option<&[u8]>) -> Result<()> {
124 let message = Message::pong(data.map(|d| d.to_vec()));
125 self.send(message).await
126 }
127
128 pub async fn next(&mut self) -> Result<Option<Message>> {
130 Ok(None)
132 }
133
134 pub async fn close(&mut self, code: Option<u16>, reason: Option<&str>) -> Result<()> {
136 self.state = ConnectionState::Closing;
137 let message = Message::close(code, reason.map(|s| s.to_string()));
138 self.send(message).await?;
139 self.state = ConnectionState::Closed;
140 Ok(())
141 }
142
143 pub fn is_connected(&self) -> bool {
145 self.state == ConnectionState::Connected
146 }
147
148 pub fn is_closed(&self) -> bool {
150 self.state == ConnectionState::Closed
151 }
152
153 pub fn age(&self) -> std::time::Duration {
155 self.metadata.established_at.elapsed()
156 }
157
158 pub fn idle_time(&self) -> std::time::Duration {
160 self.metadata.last_activity_at.elapsed()
161 }
162
163 pub fn set_connected(&mut self) {
165 self.state = ConnectionState::Connected;
166 self.metadata.established_at = std::time::Instant::now();
167 self.metadata.last_activity_at = std::time::Instant::now();
168 }
169
170 pub fn set_subprotocol(&mut self, subprotocol: String) {
172 self.metadata.subprotocol = Some(subprotocol);
173 }
174
175 pub fn add_extension(&mut self, extension: String) {
177 self.metadata.extensions.push(extension);
178 }
179}
180
181#[derive(Debug)]
183pub struct ClientConnectionHandle {
184 id: u64,
186 connection: std::sync::Arc<std::sync::Mutex<ClientConnection>>,
188}
189
190impl ClientConnectionHandle {
191 pub fn new(id: u64, connection: ClientConnection) -> Self {
193 Self {
194 id,
195 connection: std::sync::Arc::new(std::sync::Mutex::new(connection)),
196 }
197 }
198
199 pub fn id(&self) -> u64 {
201 self.id
202 }
203
204 pub fn connection(&self) -> &std::sync::Arc<std::sync::Mutex<ClientConnection>> {
206 &self.connection
207 }
208
209 pub fn try_lock(&self) -> aerosocket_core::Result<std::sync::MutexGuard<'_, ClientConnection>> {
211 self.connection
212 .try_lock()
213 .map_err(|e| aerosocket_core::Error::Other(format!("Poison error: {}", e)))
214 }
215}
216
217impl Clone for ClientConnectionHandle {
218 fn clone(&self) -> Self {
219 Self {
220 id: self.id,
221 connection: self.connection.clone(),
222 }
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[test]
231 fn test_client_connection_creation() {
232 let remote = "127.0.0.1:8080".parse().unwrap();
233 let conn = ClientConnection::new(remote);
234
235 assert_eq!(conn.remote_addr(), remote);
236 assert_eq!(conn.state(), ConnectionState::Connecting);
237 assert!(!conn.is_connected());
238 assert!(!conn.is_closed());
239 }
240
241 #[test]
242 fn test_client_connection_handle() {
243 let remote = "127.0.0.1:8080".parse().unwrap();
244 let conn = ClientConnection::new(remote);
245 let handle = ClientConnectionHandle::new(1, conn);
246
247 assert_eq!(handle.id(), 1);
248 assert!(handle.try_lock().is_ok());
249 }
250
251 #[test]
252 fn test_connection_state_transitions() {
253 let remote = "127.0.0.1:8080".parse().unwrap();
254 let mut conn = ClientConnection::new(remote);
255
256 assert_eq!(conn.state(), ConnectionState::Connecting);
257
258 conn.set_connected();
259 assert_eq!(conn.state(), ConnectionState::Connected);
260 assert!(conn.is_connected());
261 }
262
263 #[tokio::test]
264 async fn test_message_sending() {
265 let remote = "127.0.0.1:8080".parse().unwrap();
266 let mut conn = ClientConnection::new(remote);
267
268 assert!(conn.send_text("Hello").await.is_ok());
270 assert!(conn.send_binary(&[1u8, 2, 3][..]).await.is_ok());
271 assert!(conn.ping(None).await.is_ok());
272 assert!(conn.pong(None).await.is_ok());
273 }
274}