http_request/request/socket/websocket/
impl.rs1use crate::*;
2
3impl WebSocket {
4 fn get_url(&self) -> String {
5 self.url.as_ref().clone()
6 }
7
8 fn generate_websocket_key() -> String {
9 let mut key_bytes: [u8; 16] = [0u8; 16];
10 let now: u64 = SystemTime::now()
11 .duration_since(UNIX_EPOCH)
12 .unwrap_or_default()
13 .as_nanos() as u64;
14 let ptr: usize = &key_bytes as *const _ as usize;
15 for (i, byte) in key_bytes.iter_mut().enumerate() {
16 *byte = ((now.wrapping_add(ptr as u64).wrapping_add(i as u64)) % 256) as u8;
17 }
18 base64_encode(&key_bytes)
19 }
20
21 fn get_headers(&self) -> Vec<(String, String)> {
22 let mut headers: Vec<(String, String)> = Vec::new();
23 for (key, value) in self.header.iter() {
24 if let Some(first_value) = value.front() {
25 headers.push((key.clone(), first_value.clone()));
26 }
27 }
28 headers
29 }
30
31 async fn connect_async_internal(&self) -> Result<(), WebSocketError> {
32 if self.connected.load(Ordering::Relaxed) {
33 return Ok(());
34 }
35 let url: String = self.get_url();
36 if url.is_empty() {
37 return Err(WebSocketError::invalid_url("URL is empty"));
38 }
39 let url_obj: HttpUrlComponents = SharedWebSocketBuilder::parse_url(&url)?;
40 if let Ok(mut config) = self.config.write() {
41 config.url_obj = url_obj;
42 }
43 let timeout_duration: Duration = Duration::from_millis(
44 self.config
45 .read()
46 .map(|c| c.timeout)
47 .unwrap_or(DEFAULT_HIGH_SECURITY_READ_TIMEOUT_MS),
48 );
49 let headers: Vec<(String, String)> = self.get_headers();
50 let mut request_builder = Request::builder().uri(&url);
51 for (key, value) in &headers {
52 request_builder = request_builder.header(key, value);
53 }
54 let request: Request = request_builder
55 .body(())
56 .map_err(|e| WebSocketError::invalid_url(format!("Failed to build request: {e}")))?;
57 let proxy_config: Option<ProxyConfig> = self
58 .config
59 .read()
60 .ok()
61 .and_then(|config| config.proxy.clone());
62 let ws_stream: WebSocketConnectionType = if let Some(proxy_config) = proxy_config {
63 let url_obj: HttpUrlComponents = self
64 .config
65 .read()
66 .map(|c| c.url_obj.clone())
67 .unwrap_or_default();
68 let target_host: String = url_obj.host.clone().unwrap_or_default();
69 let target_port: u16 = url_obj.port.unwrap_or_default();
70 let proxy_stream: BoxAsyncReadWrite = self
71 .get_proxy_connection_stream_async(target_host.clone(), target_port, &proxy_config)
72 .await?;
73 let proxy_tunnel_stream: WebSocketProxyTunnelStream =
74 WebSocketProxyTunnelStream::new(proxy_stream);
75 let mut proxy_request_builder = Request::builder().uri(&url);
76 proxy_request_builder = proxy_request_builder
77 .header(HOST, format!("{target_host}:{target_port}"))
78 .header(UPGRADE, "websocket")
79 .header(CONNECTION, "Upgrade")
80 .header(SEC_WEBSOCKET_VERSION, "13")
81 .header(SEC_WEBSOCKET_KEY, Self::generate_websocket_key());
82 for (key, value) in &headers {
83 proxy_request_builder = proxy_request_builder.header(key, value);
84 }
85 let protocols: Vec<String> = self
86 .config
87 .read()
88 .map(|c| c.protocols.clone())
89 .unwrap_or_default();
90 if !protocols.is_empty() {
91 proxy_request_builder =
92 proxy_request_builder.header("Sec-WebSocket-String", protocols.join(", "));
93 }
94 let proxy_request: Request = proxy_request_builder.body(()).map_err(|e| {
95 WebSocketError::invalid_url(format!("Failed to build proxy request: {e}"))
96 })?;
97 let connect_future = client_async_with_config(proxy_request, proxy_tunnel_stream, None);
98 let (ws_stream, _) = timeout(timeout_duration, connect_future)
99 .await
100 .map_err(|_| WebSocketError::timeout("Connection timeout"))?
101 .map_err(|e| {
102 let error_msg: String = e.to_string();
103 if error_msg.contains("tls")
104 || error_msg.contains("TLS")
105 || error_msg.contains("ssl")
106 || error_msg.contains("SSL")
107 || error_msg.contains("certificate")
108 || error_msg.contains("handshake")
109 {
110 WebSocketError::tls(error_msg)
111 } else {
112 WebSocketError::connection(error_msg)
113 }
114 })?;
115 WebSocketConnectionType::Proxy(ws_stream)
116 } else {
117 let connect_future = connect_async_with_config(request, None, false);
118 let (ws_stream, _) = timeout(timeout_duration, connect_future)
119 .await
120 .map_err(|_| WebSocketError::timeout("Connection timeout"))?
121 .map_err(|e| {
122 let error_msg: String = e.to_string();
123 if error_msg.contains("tls")
124 || error_msg.contains("TLS")
125 || error_msg.contains("ssl")
126 || error_msg.contains("SSL")
127 || error_msg.contains("certificate")
128 || error_msg.contains("handshake")
129 {
130 WebSocketError::tls(error_msg)
131 } else {
132 WebSocketError::connection(error_msg)
133 }
134 })?;
135 WebSocketConnectionType::Direct(ws_stream)
136 };
137 let mut connection: AsyncMutexGuard<'_, Option<WebSocketConnectionType>> =
138 self.connection.lock().await;
139 *connection = Some(ws_stream);
140 self.connected.store(true, Ordering::Relaxed);
141 Ok(())
142 }
143
144 async fn send_message_async(&self, message: Message) -> Result<(), WebSocketError> {
145 if !self.connected.load(Ordering::Relaxed) {
146 self.connect_async_internal().await?;
147 }
148 let mut connection: AsyncMutexGuard<'_, Option<WebSocketConnectionType>> =
149 self.connection.lock().await;
150 if let Some(ref mut ws_stream) = *connection {
151 ws_stream
152 .send(message)
153 .await
154 .map_err(|e| WebSocketError::protocol(e.to_string()))?;
155 } else {
156 return Err(WebSocketError::connection("Not connected"));
157 }
158 Ok(())
159 }
160
161 fn send_message_sync(&self, message: Message) -> Result<(), WebSocketError> {
162 let rt: Runtime = Runtime::new().map_err(|e| WebSocketError::io(e.to_string()))?;
163 rt.block_on(self.send_message_async(message))
164 }
165
166 async fn receive_message_async(&self) -> Result<WebSocketMessage, WebSocketError> {
167 if !self.connected.load(Ordering::Relaxed) {
168 return Err(WebSocketError::connection("Not connected"));
169 }
170 let timeout_duration: Duration = Duration::from_millis(
171 self.config
172 .read()
173 .map(|c| c.timeout)
174 .unwrap_or(DEFAULT_HIGH_SECURITY_READ_TIMEOUT_MS),
175 );
176 let mut connection: AsyncMutexGuard<'_, Option<WebSocketConnectionType>> =
177 self.connection.lock().await;
178 if let Some(ref mut ws_stream) = *connection {
179 let receive_future = ws_stream.next();
180 if let Some(msg_result) = timeout(timeout_duration, receive_future)
181 .await
182 .map_err(|_| WebSocketError::timeout("Receive timeout"))?
183 {
184 let message: Message =
185 msg_result.map_err(|e| WebSocketError::protocol(e.to_string()))?;
186 return Ok(self.convert_message(message));
187 }
188 }
189 Err(WebSocketError::connection("Connection closed"))
190 }
191
192 fn receive_message_sync(&self) -> Result<WebSocketMessage, WebSocketError> {
193 let rt: Runtime = Runtime::new().map_err(|e| WebSocketError::io(e.to_string()))?;
194 rt.block_on(self.receive_message_async())
195 }
196
197 fn convert_message(&self, message: Message) -> WebSocketMessage {
198 match message {
199 Message::Text(text) => WebSocketMessage::Text(text.to_string()),
200 Message::Binary(data) => WebSocketMessage::Binary(data.to_vec()),
201 Message::Ping(data) => WebSocketMessage::Ping(data.to_vec()),
202 Message::Pong(data) => WebSocketMessage::Pong(data.to_vec()),
203 Message::Close(_) => WebSocketMessage::Close,
204 Message::Frame(_) => WebSocketMessage::Close,
205 }
206 }
207
208 async fn close_async_internal(&self) -> Result<(), WebSocketError> {
209 let mut connection: AsyncMutexGuard<'_, Option<WebSocketConnectionType>> =
210 self.connection.lock().await;
211 if let Some(ref mut ws_stream) = *connection {
212 ws_stream
213 .send(Message::Close(None))
214 .await
215 .map_err(|e| WebSocketError::protocol(e.to_string()))?;
216 use futures::SinkExt;
217 ws_stream
218 .close()
219 .await
220 .map_err(|e| WebSocketError::protocol(e.to_string()))?;
221 }
222 *connection = None;
223 self.connected.store(false, Ordering::Relaxed);
224 Ok(())
225 }
226
227 fn close_sync(&self) -> Result<(), WebSocketError> {
228 let rt: Runtime = Runtime::new().map_err(|e| WebSocketError::io(e.to_string()))?;
229 rt.block_on(self.close_async_internal())
230 }
231
232 async fn get_proxy_connection_stream_async(
233 &self,
234 target_host: String,
235 target_port: u16,
236 proxy_config: &ProxyConfig,
237 ) -> Result<BoxAsyncReadWrite, WebSocketError> {
238 match proxy_config.proxy_type {
239 ProxyType::Http | ProxyType::Https => {
240 self.get_http_proxy_connection_async(target_host, target_port, proxy_config)
241 .await
242 }
243 ProxyType::Socks5 => {
244 self.get_socks5_proxy_connection_async(target_host, target_port, proxy_config)
245 .await
246 }
247 }
248 }
249
250 async fn get_http_proxy_connection_async(
251 &self,
252 target_host: String,
253 target_port: u16,
254 proxy_config: &ProxyConfig,
255 ) -> Result<BoxAsyncReadWrite, WebSocketError> {
256 let proxy_host_port: (String, u16) = (proxy_config.host.clone(), proxy_config.port);
257 let tcp_stream: AsyncTcpStream = AsyncTcpStream::connect(proxy_host_port)
258 .await
259 .map_err(|err| WebSocketError::connection(err.to_string()))?;
260 let mut proxy_stream: BoxAsyncReadWrite = if proxy_config.proxy_type == ProxyType::Https {
261 let roots: RootCertStore = RootCertStore {
262 roots: TLS_SERVER_ROOTS.to_vec(),
263 };
264 let tls_config: ClientConfig = ClientConfig::builder()
265 .with_root_certificates(roots)
266 .with_no_client_auth();
267 let connector: TlsConnector = TlsConnector::from(Arc::new(tls_config));
268 let dns_name: ServerName<'_> = ServerName::try_from(proxy_config.host.clone())
269 .map_err(|err| WebSocketError::tls(err.to_string()))?;
270 let tls_stream: TlsStream<AsyncTcpStream> = connector
271 .connect(dns_name, tcp_stream)
272 .await
273 .map_err(|err| WebSocketError::tls(err.to_string()))?;
274 Box::new(tls_stream)
275 } else {
276 Box::new(tcp_stream)
277 };
278 let connect_request: String = if let (Some(username), Some(password)) =
279 (&proxy_config.username, &proxy_config.password)
280 {
281 let auth: String = format!("{username}:{password}");
282 let auth_encoded: String = base64_encode(auth.as_bytes());
283 format!(
284 "CONNECT {target_host}:{target_port} HTTP/1.1\r\nHost: {target_host}:{target_port}\r\nProxy-Authorization: Basic {auth_encoded}\r\n\r\n"
285 )
286 } else {
287 format!(
288 "CONNECT {target_host}:{target_port} HTTP/1.1\r\nHost: {target_host}:{target_port}\r\n\r\n"
289 )
290 };
291 proxy_stream
292 .write_all(connect_request.as_bytes())
293 .await
294 .map_err(|err| WebSocketError::protocol(err.to_string()))?;
295 proxy_stream
296 .flush()
297 .await
298 .map_err(|err| WebSocketError::protocol(err.to_string()))?;
299 let mut response_buffer: [u8; 1024] = [0u8; 1024];
300 let bytes_read: usize = proxy_stream
301 .read(&mut response_buffer)
302 .await
303 .map_err(|err| WebSocketError::protocol(err.to_string()))?;
304 let response: Cow<'_, str> = String::from_utf8_lossy(&response_buffer[..bytes_read]);
305 if !response.starts_with("HTTP/1.1 200") && !response.starts_with("HTTP/1.0 200") {
306 return Err(WebSocketError::connection(format!(
307 "Proxy connection failed: {}",
308 response.lines().next().unwrap_or("Unknown error")
309 )));
310 }
311 Ok(proxy_stream)
312 }
313
314 async fn get_socks5_proxy_connection_async(
315 &self,
316 target_host: String,
317 target_port: u16,
318 proxy_config: &ProxyConfig,
319 ) -> Result<BoxAsyncReadWrite, WebSocketError> {
320 let proxy_host_port: (String, u16) = (proxy_config.host.clone(), proxy_config.port);
321 let mut tcp_stream: AsyncTcpStream = AsyncTcpStream::connect(proxy_host_port)
322 .await
323 .map_err(|err| WebSocketError::connection(err.to_string()))?;
324 let auth_methods: Vec<u8> =
325 if proxy_config.username.is_some() && proxy_config.password.is_some() {
326 vec![0x05, 0x02, 0x00, 0x02]
327 } else {
328 vec![0x05, 0x01, 0x00]
329 };
330 tcp_stream
331 .write_all(&auth_methods)
332 .await
333 .map_err(|err| WebSocketError::protocol(err.to_string()))?;
334 let mut response: [u8; 2] = [0u8; 2];
335 tcp_stream
336 .read_exact(&mut response)
337 .await
338 .map_err(|err| WebSocketError::protocol(err.to_string()))?;
339 if response[0] != 0x05 {
340 return Err(WebSocketError::protocol("Invalid SOCKS5 response"));
341 }
342 match response[1] {
343 0x00 => {}
344 0x02 => {
345 if let (Some(username), Some(password)) =
346 (&proxy_config.username, &proxy_config.password)
347 {
348 let mut auth_request = vec![0x01];
349 auth_request.push(username.len() as u8);
350 auth_request.extend_from_slice(username.as_bytes());
351 auth_request.push(password.len() as u8);
352 auth_request.extend_from_slice(password.as_bytes());
353
354 tcp_stream
355 .write_all(&auth_request)
356 .await
357 .map_err(|err| WebSocketError::protocol(err.to_string()))?;
358
359 let mut auth_response = [0u8; 2];
360 tcp_stream
361 .read_exact(&mut auth_response)
362 .await
363 .map_err(|err| WebSocketError::protocol(err.to_string()))?;
364
365 if auth_response[1] != 0x00 {
366 return Err(WebSocketError::protocol("SOCKS5 authentication failed"));
367 }
368 } else {
369 return Err(WebSocketError::protocol(
370 "SOCKS5 proxy requires authentication",
371 ));
372 }
373 }
374 0xFF => {
375 return Err(WebSocketError::protocol(
376 "No acceptable SOCKS5 authentication methods",
377 ));
378 }
379 _ => {
380 return Err(WebSocketError::protocol(
381 "Unsupported SOCKS5 authentication method",
382 ));
383 }
384 }
385 let mut connect_request: Vec<u8> = vec![0x05, 0x01, 0x00];
386 if target_host.parse::<Ipv4Addr>().is_ok() {
387 connect_request.push(0x01);
388 let ip: Ipv4Addr = target_host.parse().unwrap();
389 connect_request.extend_from_slice(&ip.octets());
390 } else if target_host.parse::<Ipv6Addr>().is_ok() {
391 connect_request.push(0x04);
392 let ip: Ipv6Addr = target_host.parse().unwrap();
393 connect_request.extend_from_slice(&ip.octets());
394 } else {
395 connect_request.push(0x03);
396 connect_request.push(target_host.len() as u8);
397 connect_request.extend_from_slice(target_host.as_bytes());
398 }
399 connect_request.extend_from_slice(&target_port.to_be_bytes());
400 tcp_stream
401 .write_all(&connect_request)
402 .await
403 .map_err(|err| WebSocketError::protocol(err.to_string()))?;
404
405 let mut connect_response: [u8; 4] = [0u8; 4];
406 tcp_stream
407 .read_exact(&mut connect_response)
408 .await
409 .map_err(|err| WebSocketError::protocol(err.to_string()))?;
410
411 if connect_response[0] != 0x05 || connect_response[1] != 0x00 {
412 return Err(WebSocketError::protocol(format!(
413 "SOCKS5 connection failed with code: {}",
414 connect_response[1]
415 )));
416 }
417 match connect_response[3] {
418 0x01 => {
419 let mut skip: [u8; 6] = [0u8; 6];
420 tcp_stream
421 .read_exact(&mut skip)
422 .await
423 .map_err(|err| WebSocketError::protocol(err.to_string()))?;
424 }
425 0x03 => {
426 let mut len: [u8; 1] = [0u8; 1];
427 tcp_stream
428 .read_exact(&mut len)
429 .await
430 .map_err(|err| WebSocketError::protocol(err.to_string()))?;
431 let mut skip: Vec<u8> = vec![0u8; len[0] as usize + 2];
432 tcp_stream
433 .read_exact(&mut skip)
434 .await
435 .map_err(|err| WebSocketError::protocol(err.to_string()))?;
436 }
437 0x04 => {
438 let mut skip: [u8; 18] = [0u8; 18];
439 tcp_stream
440 .read_exact(&mut skip)
441 .await
442 .map_err(|err| WebSocketError::protocol(err.to_string()))?;
443 }
444 _ => {
445 return Err(WebSocketError::protocol("Invalid SOCKS5 address type"));
446 }
447 }
448 let proxy_stream: BoxAsyncReadWrite = Box::new(tcp_stream);
449 Ok(proxy_stream)
450 }
451
452 pub fn send_text(&mut self, text: &str) -> WebSocketResult {
462 let message: Message = Message::Text(text.into());
463 self.send_message_sync(message)
464 }
465
466 pub fn send_binary(&mut self, data: &[u8]) -> WebSocketResult {
476 let message: Message = Message::Binary(data.to_vec().into());
477 self.send_message_sync(message)
478 }
479
480 pub fn send_ping(&mut self, data: &[u8]) -> WebSocketResult {
490 let message: Message = Message::Ping(data.to_vec().into());
491 self.send_message_sync(message)
492 }
493
494 pub fn send_pong(&mut self, data: &[u8]) -> WebSocketResult {
504 let message: Message = Message::Pong(data.to_vec().into());
505 self.send_message_sync(message)
506 }
507
508 pub fn receive(&mut self) -> WebSocketMessageResult {
514 self.receive_message_sync()
515 }
516
517 pub fn close(&mut self) -> WebSocketResult {
523 self.close_sync()
524 }
525
526 pub fn is_connected(&self) -> bool {
532 self.connected.load(Ordering::Relaxed)
533 }
534
535 pub async fn send_text_async(&mut self, text: &str) -> WebSocketResult {
545 let message: Message = Message::Text(text.into());
546 self.send_message_async(message).await
547 }
548
549 pub async fn send_binary_async(&mut self, data: &[u8]) -> WebSocketResult {
559 let message: Message = Message::Binary(data.to_vec().into());
560 self.send_message_async(message).await
561 }
562
563 pub async fn send_ping_async(&mut self, data: &[u8]) -> WebSocketResult {
573 let message: Message = Message::Ping(data.to_vec().into());
574 self.send_message_async(message).await
575 }
576
577 pub async fn send_pong_async(&mut self, data: &[u8]) -> WebSocketResult {
587 let message: Message = Message::Pong(data.to_vec().into());
588 self.send_message_async(message).await
589 }
590
591 pub async fn receive_async(&mut self) -> WebSocketMessageResult {
597 self.receive_message_async().await
598 }
599
600 pub async fn close_async_method(&mut self) -> WebSocketResult {
606 self.close_async_internal().await
607 }
608}
609
610impl WebSocketTrait for WebSocket {
618 fn send_text(&mut self, text: &str) -> WebSocketResult {
619 self.send_text(text)
620 }
621
622 fn send_binary(&mut self, data: &[u8]) -> WebSocketResult {
623 self.send_binary(data)
624 }
625
626 fn send_ping(&mut self, data: &[u8]) -> WebSocketResult {
627 self.send_ping(data)
628 }
629
630 fn send_pong(&mut self, data: &[u8]) -> WebSocketResult {
631 self.send_pong(data)
632 }
633
634 fn receive(&mut self) -> WebSocketMessageResult {
635 self.receive()
636 }
637
638 fn close(&mut self) -> WebSocketResult {
639 self.close()
640 }
641
642 fn is_connected(&self) -> bool {
643 self.is_connected()
644 }
645}
646
647impl AsyncWebSocketTrait for WebSocket {
655 fn send_text<'a>(
656 &'a mut self,
657 text: &'a str,
658 ) -> Pin<Box<dyn Future<Output = WebSocketResult> + Send + 'a>> {
659 Box::pin(self.send_text_async(text))
660 }
661
662 fn send_binary<'a>(
663 &'a mut self,
664 data: &'a [u8],
665 ) -> Pin<Box<dyn Future<Output = WebSocketResult> + Send + 'a>> {
666 Box::pin(self.send_binary_async(data))
667 }
668
669 fn send_ping<'a>(
670 &'a mut self,
671 data: &'a [u8],
672 ) -> Pin<Box<dyn Future<Output = WebSocketResult> + Send + 'a>> {
673 Box::pin(self.send_ping_async(data))
674 }
675
676 fn send_pong<'a>(
677 &'a mut self,
678 data: &'a [u8],
679 ) -> Pin<Box<dyn Future<Output = WebSocketResult> + Send + 'a>> {
680 Box::pin(self.send_pong_async(data))
681 }
682
683 fn receive(&mut self) -> Pin<Box<dyn Future<Output = WebSocketMessageResult> + Send + '_>> {
684 Box::pin(self.receive_async())
685 }
686
687 fn close(&mut self) -> Pin<Box<dyn Future<Output = WebSocketResult> + Send + '_>> {
688 Box::pin(self.close_async_method())
689 }
690
691 fn is_connected(&self) -> bool {
692 self.is_connected()
693 }
694}