Skip to main content

tf_rust_engineio/transports/
websocket_secure.rs

1use crate::{
2    asynchronous::{
3        async_transports::WebsocketSecureTransport as AsyncWebsocketSecureTransport,
4        transport::AsyncTransport,
5    },
6    error::Result,
7    transport::Transport,
8    Error,
9};
10use bytes::Bytes;
11use http::HeaderMap;
12use native_tls::TlsConnector;
13use std::{sync::Arc, time::Duration};
14use tokio::runtime::Runtime;
15use url::Url;
16
17#[derive(Clone)]
18pub struct WebsocketSecureTransport {
19    runtime: Arc<Runtime>,
20    inner: Arc<AsyncWebsocketSecureTransport>,
21}
22
23impl WebsocketSecureTransport {
24    /// Creates an instance of `WebsocketSecureTransport`.
25    pub fn new(
26        base_url: Url,
27        tls_config: Option<TlsConnector>,
28        headers: Option<HeaderMap>,
29    ) -> Result<Self> {
30        let runtime = tokio::runtime::Builder::new_current_thread()
31            .enable_all()
32            .build()?;
33
34        let inner = runtime.block_on(AsyncWebsocketSecureTransport::new(
35            base_url, tls_config, headers,
36        ))?;
37
38        Ok(WebsocketSecureTransport {
39            runtime: Arc::new(runtime),
40            inner: Arc::new(inner),
41        })
42    }
43
44    /// Sends probe packet to ensure connection is valid, then sends upgrade
45    /// request
46    pub(crate) fn upgrade(&self) -> Result<()> {
47        self.runtime.block_on(async { self.inner.upgrade().await })
48    }
49}
50
51impl Transport for WebsocketSecureTransport {
52    fn emit(&self, data: Bytes, is_binary_att: bool) -> Result<()> {
53        self.runtime
54            .block_on(async { self.inner.emit(data, is_binary_att).await })
55    }
56
57    fn poll(&self, timeout: Duration) -> Result<Bytes> {
58        self.runtime.block_on(async {
59            let r = match tokio::time::timeout(timeout, self.inner.poll_next()).await {
60                Ok(r) => r,
61                Err(_) => return Err(Error::PingTimeout()),
62            };
63            match r {
64                Ok(b) => b.ok_or(Error::IncompletePacket()),
65                Err(_) => Err(Error::IncompletePacket()),
66            }
67        })
68    }
69
70    fn base_url(&self) -> Result<url::Url> {
71        self.runtime.block_on(async { self.inner.base_url().await })
72    }
73
74    fn set_base_url(&self, url: url::Url) -> Result<()> {
75        self.runtime
76            .block_on(async { self.inner.set_base_url(url).await })
77    }
78}
79
80impl std::fmt::Debug for WebsocketSecureTransport {
81    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
82        f.write_fmt(format_args!(
83            "WebsocketSecureTransport(base_url: {:?})",
84            self.base_url(),
85        ))
86    }
87}
88
89#[cfg(test)]
90mod test {
91    use super::*;
92    use crate::ENGINE_IO_VERSION;
93    use std::str::FromStr;
94    fn new() -> Result<WebsocketSecureTransport> {
95        let url = crate::test::engine_io_server_secure()?.to_string()
96            + "engine.io/?EIO="
97            + &ENGINE_IO_VERSION.to_string();
98        WebsocketSecureTransport::new(
99            Url::from_str(&url[..])?,
100            Some(crate::test::tls_connector()?),
101            None,
102        )
103    }
104
105    #[test]
106    fn websocket_secure_transport_base_url() -> Result<()> {
107        let transport = new()?;
108        let mut url = crate::test::engine_io_server_secure()?;
109        url.set_path("/engine.io/");
110        url.query_pairs_mut()
111            .append_pair("EIO", &ENGINE_IO_VERSION.to_string())
112            .append_pair("transport", "websocket");
113        url.set_scheme("wss").unwrap();
114        assert_eq!(transport.base_url()?.to_string(), url.to_string());
115        transport.set_base_url(reqwest::Url::parse("https://127.0.0.1")?)?;
116        assert_eq!(
117            transport.base_url()?.to_string(),
118            "wss://127.0.0.1/?transport=websocket"
119        );
120        assert_ne!(transport.base_url()?.to_string(), url.to_string());
121
122        transport.set_base_url(reqwest::Url::parse(
123            "http://127.0.0.1/?transport=websocket",
124        )?)?;
125        assert_eq!(
126            transport.base_url()?.to_string(),
127            "wss://127.0.0.1/?transport=websocket"
128        );
129        assert_ne!(transport.base_url()?.to_string(), url.to_string());
130        Ok(())
131    }
132
133    #[test]
134    fn websocket_secure_debug() -> Result<()> {
135        let transport = new()?;
136        assert_eq!(
137            format!("{:?}", transport),
138            format!(
139                "WebsocketSecureTransport(base_url: {:?})",
140                transport.base_url()
141            )
142        );
143        Ok(())
144    }
145}