tf_rust_engineio/transports/
websocket_secure.rs1use crate::{
2 asynchronous::{
3 async_transports::WebsocketSecureTransport as AsyncWebsocketSecureTransport,
4 transport::AsyncTransport,
5 },
6 error::Result,
7 transport::Transport,
8 Error,
9};
10use bytes::Bytes;
11use http::HeaderMap;
12use native_tls::TlsConnector;
13use std::{sync::Arc, time::Duration};
14use tokio::runtime::Runtime;
15use url::Url;
16
17#[derive(Clone)]
18pub struct WebsocketSecureTransport {
19 runtime: Arc<Runtime>,
20 inner: Arc<AsyncWebsocketSecureTransport>,
21}
22
23impl WebsocketSecureTransport {
24 pub fn new(
26 base_url: Url,
27 tls_config: Option<TlsConnector>,
28 headers: Option<HeaderMap>,
29 ) -> Result<Self> {
30 let runtime = tokio::runtime::Builder::new_current_thread()
31 .enable_all()
32 .build()?;
33
34 let inner = runtime.block_on(AsyncWebsocketSecureTransport::new(
35 base_url, tls_config, headers,
36 ))?;
37
38 Ok(WebsocketSecureTransport {
39 runtime: Arc::new(runtime),
40 inner: Arc::new(inner),
41 })
42 }
43
44 pub(crate) fn upgrade(&self) -> Result<()> {
47 self.runtime.block_on(async { self.inner.upgrade().await })
48 }
49}
50
51impl Transport for WebsocketSecureTransport {
52 fn emit(&self, data: Bytes, is_binary_att: bool) -> Result<()> {
53 self.runtime
54 .block_on(async { self.inner.emit(data, is_binary_att).await })
55 }
56
57 fn poll(&self, timeout: Duration) -> Result<Bytes> {
58 self.runtime.block_on(async {
59 let r = match tokio::time::timeout(timeout, self.inner.poll_next()).await {
60 Ok(r) => r,
61 Err(_) => return Err(Error::PingTimeout()),
62 };
63 match r {
64 Ok(b) => b.ok_or(Error::IncompletePacket()),
65 Err(_) => Err(Error::IncompletePacket()),
66 }
67 })
68 }
69
70 fn base_url(&self) -> Result<url::Url> {
71 self.runtime.block_on(async { self.inner.base_url().await })
72 }
73
74 fn set_base_url(&self, url: url::Url) -> Result<()> {
75 self.runtime
76 .block_on(async { self.inner.set_base_url(url).await })
77 }
78}
79
80impl std::fmt::Debug for WebsocketSecureTransport {
81 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
82 f.write_fmt(format_args!(
83 "WebsocketSecureTransport(base_url: {:?})",
84 self.base_url(),
85 ))
86 }
87}
88
89#[cfg(test)]
90mod test {
91 use super::*;
92 use crate::ENGINE_IO_VERSION;
93 use std::str::FromStr;
94 fn new() -> Result<WebsocketSecureTransport> {
95 let url = crate::test::engine_io_server_secure()?.to_string()
96 + "engine.io/?EIO="
97 + &ENGINE_IO_VERSION.to_string();
98 WebsocketSecureTransport::new(
99 Url::from_str(&url[..])?,
100 Some(crate::test::tls_connector()?),
101 None,
102 )
103 }
104
105 #[test]
106 fn websocket_secure_transport_base_url() -> Result<()> {
107 let transport = new()?;
108 let mut url = crate::test::engine_io_server_secure()?;
109 url.set_path("/engine.io/");
110 url.query_pairs_mut()
111 .append_pair("EIO", &ENGINE_IO_VERSION.to_string())
112 .append_pair("transport", "websocket");
113 url.set_scheme("wss").unwrap();
114 assert_eq!(transport.base_url()?.to_string(), url.to_string());
115 transport.set_base_url(reqwest::Url::parse("https://127.0.0.1")?)?;
116 assert_eq!(
117 transport.base_url()?.to_string(),
118 "wss://127.0.0.1/?transport=websocket"
119 );
120 assert_ne!(transport.base_url()?.to_string(), url.to_string());
121
122 transport.set_base_url(reqwest::Url::parse(
123 "http://127.0.0.1/?transport=websocket",
124 )?)?;
125 assert_eq!(
126 transport.base_url()?.to_string(),
127 "wss://127.0.0.1/?transport=websocket"
128 );
129 assert_ne!(transport.base_url()?.to_string(), url.to_string());
130 Ok(())
131 }
132
133 #[test]
134 fn websocket_secure_debug() -> Result<()> {
135 let transport = new()?;
136 assert_eq!(
137 format!("{:?}", transport),
138 format!(
139 "WebsocketSecureTransport(base_url: {:?})",
140 transport.base_url()
141 )
142 );
143 Ok(())
144 }
145}