tf_rust_engineio/transports/
websocket_secure.rs1use crate::{
2 asynchronous::{
3 async_transports::WebsocketSecureTransport as AsyncWebsocketSecureTransport,
4 transport::AsyncTransport,
5 },
6 error::Result,
7 transport::Transport,
8 Error,
9};
10use bytes::Bytes;
11use http::HeaderMap;
12use native_tls::TlsConnector;
13use std::{sync::Arc, time::Duration};
14use tokio::runtime::Runtime;
15use url::Url;
16
17#[derive(Clone)]
18pub struct WebsocketSecureTransport {
19 runtime: Arc<Runtime>,
20 inner: Arc<AsyncWebsocketSecureTransport>,
21}
22
23impl WebsocketSecureTransport {
24 pub fn new(
26 base_url: Url,
27 tls_config: Option<TlsConnector>,
28 headers: Option<HeaderMap>,
29 ) -> Result<Self> {
30 let runtime = tokio::runtime::Builder::new_current_thread()
31 .enable_all()
32 .build()?;
33
34 let inner = runtime.block_on(AsyncWebsocketSecureTransport::new(
35 base_url, tls_config, headers,
36 ))?;
37
38 Ok(WebsocketSecureTransport {
39 runtime: Arc::new(runtime),
40 inner: Arc::new(inner),
41 })
42 }
43
44 pub(crate) fn upgrade(&self) -> Result<()> {
47 self.runtime.block_on(async { self.inner.upgrade().await })
48 }
49}
50
51impl Transport for WebsocketSecureTransport {
52 fn emit(&self, data: Bytes, is_binary_att: bool) -> Result<()> {
53 self.runtime
54 .block_on(async { self.inner.emit(data, is_binary_att).await })
55 }
56
57 fn poll(&self, timeout: Duration) -> Result<Bytes> {
58 self.runtime.block_on(async {
59 let r = match tokio::time::timeout(timeout, self.inner.poll_next()).await {
60 Ok(r) => r,
61 Err(_) => return Err(Error::PingTimeout()),
62 };
63 match r {
64 Ok(b) => b.ok_or(Error::IncompletePacket()),
65 Err(e) => Err(e),
68 }
69 })
70 }
71
72 fn base_url(&self) -> Result<url::Url> {
73 self.runtime.block_on(async { self.inner.base_url().await })
74 }
75
76 fn set_base_url(&self, url: url::Url) -> Result<()> {
77 self.runtime
78 .block_on(async { self.inner.set_base_url(url).await })
79 }
80}
81
82impl std::fmt::Debug for WebsocketSecureTransport {
83 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
84 f.write_fmt(format_args!(
85 "WebsocketSecureTransport(base_url: {:?})",
86 self.base_url(),
87 ))
88 }
89}
90
91#[cfg(test)]
92mod test {
93 use super::*;
94 use crate::ENGINE_IO_VERSION;
95 use std::str::FromStr;
96 fn new() -> Result<WebsocketSecureTransport> {
97 let url = crate::test::engine_io_server_secure()?.to_string()
98 + "engine.io/?EIO="
99 + &ENGINE_IO_VERSION.to_string();
100 WebsocketSecureTransport::new(
101 Url::from_str(&url[..])?,
102 Some(crate::test::tls_connector()?),
103 None,
104 )
105 }
106
107 #[test]
108 fn websocket_secure_transport_base_url() -> Result<()> {
109 let transport = new()?;
110 let mut url = crate::test::engine_io_server_secure()?;
111 url.set_path("/engine.io/");
112 url.query_pairs_mut()
113 .append_pair("EIO", &ENGINE_IO_VERSION.to_string())
114 .append_pair("transport", "websocket");
115 url.set_scheme("wss").unwrap();
116 assert_eq!(transport.base_url()?.to_string(), url.to_string());
117 transport.set_base_url(reqwest::Url::parse("https://127.0.0.1")?)?;
118 assert_eq!(
119 transport.base_url()?.to_string(),
120 "wss://127.0.0.1/?transport=websocket"
121 );
122 assert_ne!(transport.base_url()?.to_string(), url.to_string());
123
124 transport.set_base_url(reqwest::Url::parse(
125 "http://127.0.0.1/?transport=websocket",
126 )?)?;
127 assert_eq!(
128 transport.base_url()?.to_string(),
129 "wss://127.0.0.1/?transport=websocket"
130 );
131 assert_ne!(transport.base_url()?.to_string(), url.to_string());
132 Ok(())
133 }
134
135 #[test]
136 fn websocket_secure_debug() -> Result<()> {
137 let transport = new()?;
138 assert_eq!(
139 format!("{:?}", transport),
140 format!(
141 "WebsocketSecureTransport(base_url: {:?})",
142 transport.base_url()
143 )
144 );
145 Ok(())
146 }
147}