Skip to main content

tf_rust_engineio/transports/
websocket_secure.rs

1use crate::{
2    asynchronous::{
3        async_transports::WebsocketSecureTransport as AsyncWebsocketSecureTransport,
4        transport::AsyncTransport,
5    },
6    error::Result,
7    transport::Transport,
8    Error,
9};
10use bytes::Bytes;
11use http::HeaderMap;
12use native_tls::TlsConnector;
13use std::{sync::Arc, time::Duration};
14use tokio::runtime::Runtime;
15use url::Url;
16
17#[derive(Clone)]
18pub struct WebsocketSecureTransport {
19    runtime: Arc<Runtime>,
20    inner: Arc<AsyncWebsocketSecureTransport>,
21}
22
23impl WebsocketSecureTransport {
24    /// Creates an instance of `WebsocketSecureTransport`.
25    pub fn new(
26        base_url: Url,
27        tls_config: Option<TlsConnector>,
28        headers: Option<HeaderMap>,
29    ) -> Result<Self> {
30        let runtime = tokio::runtime::Builder::new_current_thread()
31            .enable_all()
32            .build()?;
33
34        let inner = runtime.block_on(AsyncWebsocketSecureTransport::new(
35            base_url, tls_config, headers,
36        ))?;
37
38        Ok(WebsocketSecureTransport {
39            runtime: Arc::new(runtime),
40            inner: Arc::new(inner),
41        })
42    }
43
44    /// Sends probe packet to ensure connection is valid, then sends upgrade
45    /// request
46    pub(crate) fn upgrade(&self) -> Result<()> {
47        self.runtime.block_on(async { self.inner.upgrade().await })
48    }
49}
50
51impl Transport for WebsocketSecureTransport {
52    fn emit(&self, data: Bytes, is_binary_att: bool) -> Result<()> {
53        self.runtime
54            .block_on(async { self.inner.emit(data, is_binary_att).await })
55    }
56
57    fn poll(&self, timeout: Duration) -> Result<Bytes> {
58        self.runtime.block_on(async {
59            let r = match tokio::time::timeout(timeout, self.inner.poll_next()).await {
60                Ok(r) => r,
61                Err(_) => return Err(Error::PingTimeout()),
62            };
63            match r {
64                Ok(b) => b.ok_or(Error::IncompletePacket()),
65                // propagate the real transport error (e.g. `WebsocketClosed`)
66                // instead of masking it, so the close code is not lost
67                Err(e) => Err(e),
68            }
69        })
70    }
71
72    fn base_url(&self) -> Result<url::Url> {
73        self.runtime.block_on(async { self.inner.base_url().await })
74    }
75
76    fn set_base_url(&self, url: url::Url) -> Result<()> {
77        self.runtime
78            .block_on(async { self.inner.set_base_url(url).await })
79    }
80}
81
82impl std::fmt::Debug for WebsocketSecureTransport {
83    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
84        f.write_fmt(format_args!(
85            "WebsocketSecureTransport(base_url: {:?})",
86            self.base_url(),
87        ))
88    }
89}
90
91#[cfg(test)]
92mod test {
93    use super::*;
94    use crate::ENGINE_IO_VERSION;
95    use std::str::FromStr;
96    fn new() -> Result<WebsocketSecureTransport> {
97        let url = crate::test::engine_io_server_secure()?.to_string()
98            + "engine.io/?EIO="
99            + &ENGINE_IO_VERSION.to_string();
100        WebsocketSecureTransport::new(
101            Url::from_str(&url[..])?,
102            Some(crate::test::tls_connector()?),
103            None,
104        )
105    }
106
107    #[test]
108    fn websocket_secure_transport_base_url() -> Result<()> {
109        let transport = new()?;
110        let mut url = crate::test::engine_io_server_secure()?;
111        url.set_path("/engine.io/");
112        url.query_pairs_mut()
113            .append_pair("EIO", &ENGINE_IO_VERSION.to_string())
114            .append_pair("transport", "websocket");
115        url.set_scheme("wss").unwrap();
116        assert_eq!(transport.base_url()?.to_string(), url.to_string());
117        transport.set_base_url(reqwest::Url::parse("https://127.0.0.1")?)?;
118        assert_eq!(
119            transport.base_url()?.to_string(),
120            "wss://127.0.0.1/?transport=websocket"
121        );
122        assert_ne!(transport.base_url()?.to_string(), url.to_string());
123
124        transport.set_base_url(reqwest::Url::parse(
125            "http://127.0.0.1/?transport=websocket",
126        )?)?;
127        assert_eq!(
128            transport.base_url()?.to_string(),
129            "wss://127.0.0.1/?transport=websocket"
130        );
131        assert_ne!(transport.base_url()?.to_string(), url.to_string());
132        Ok(())
133    }
134
135    #[test]
136    fn websocket_secure_debug() -> Result<()> {
137        let transport = new()?;
138        assert_eq!(
139            format!("{:?}", transport),
140            format!(
141                "WebsocketSecureTransport(base_url: {:?})",
142                transport.base_url()
143            )
144        );
145        Ok(())
146    }
147}