Skip to main content

tf_rust_engineio/transports/
polling.rs

1use crate::error::{Error, Result};
2use crate::transport::Transport;
3use base64::{engine::general_purpose, Engine as _};
4use bytes::{BufMut, Bytes, BytesMut};
5use native_tls::TlsConnector;
6use reqwest::{
7    blocking::{Client, ClientBuilder},
8    header::HeaderMap,
9};
10use std::sync::{Arc, RwLock};
11use std::time::Duration;
12use url::Url;
13
14#[derive(Debug, Clone)]
15pub struct PollingTransport {
16    client: Arc<Client>,
17    base_url: Arc<RwLock<Url>>,
18}
19
20impl PollingTransport {
21    /// Creates an instance of `PollingTransport`.
22    pub fn new(
23        base_url: Url,
24        tls_config: Option<TlsConnector>,
25        opening_headers: Option<HeaderMap>,
26    ) -> Self {
27        let client = match (tls_config, opening_headers) {
28            (Some(config), Some(map)) => ClientBuilder::new()
29                .use_preconfigured_tls(config)
30                .default_headers(map)
31                .build()
32                .unwrap(),
33            (Some(config), None) => ClientBuilder::new()
34                .use_preconfigured_tls(config)
35                .build()
36                .unwrap(),
37            (None, Some(map)) => ClientBuilder::new().default_headers(map).build().unwrap(),
38            (None, None) => Client::new(),
39        };
40
41        let mut url = base_url;
42        url.query_pairs_mut().append_pair("transport", "polling");
43
44        PollingTransport {
45            client: Arc::new(client),
46            base_url: Arc::new(RwLock::new(url)),
47        }
48    }
49}
50
51impl Transport for PollingTransport {
52    fn emit(&self, data: Bytes, is_binary_att: bool) -> Result<()> {
53        let data_to_send = if is_binary_att {
54            // the binary attachment gets `base64` encoded
55            let mut packet_bytes = BytesMut::with_capacity(data.len() + 1);
56            packet_bytes.put_u8(b'b');
57
58            let encoded_data = general_purpose::STANDARD.encode(data);
59            packet_bytes.put(encoded_data.as_bytes());
60
61            packet_bytes.freeze()
62        } else {
63            data
64        };
65        let status = self
66            .client
67            .post(self.address()?)
68            .body(data_to_send)
69            .send()?
70            .status()
71            .as_u16();
72
73        if status != 200 {
74            let error = Error::IncompleteHttp(status);
75            return Err(error);
76        }
77
78        Ok(())
79    }
80
81    fn poll(&self, timeout: Duration) -> Result<Bytes> {
82        Ok(self
83            .client
84            .get(self.address()?)
85            .timeout(timeout)
86            .send()?
87            .bytes()?)
88    }
89
90    fn base_url(&self) -> Result<Url> {
91        Ok(self.base_url.read()?.clone())
92    }
93
94    fn set_base_url(&self, base_url: Url) -> Result<()> {
95        let mut url = base_url;
96        if !url
97            .query_pairs()
98            .any(|(k, v)| k == "transport" && v == "polling")
99        {
100            url.query_pairs_mut().append_pair("transport", "polling");
101        }
102        *self.base_url.write()? = url;
103        Ok(())
104    }
105}
106
107#[cfg(test)]
108mod test {
109    use super::*;
110    use std::str::FromStr;
111    #[test]
112    fn polling_transport_base_url() -> Result<()> {
113        let url = crate::test::engine_io_server()?.to_string();
114        let transport = PollingTransport::new(Url::from_str(&url[..]).unwrap(), None, None);
115        assert_eq!(
116            transport.base_url()?.to_string(),
117            url.clone() + "?transport=polling"
118        );
119        transport.set_base_url(Url::parse("https://127.0.0.1")?)?;
120        assert_eq!(
121            transport.base_url()?.to_string(),
122            "https://127.0.0.1/?transport=polling"
123        );
124        assert_ne!(transport.base_url()?.to_string(), url);
125
126        transport.set_base_url(Url::parse("http://127.0.0.1/?transport=polling")?)?;
127        assert_eq!(
128            transport.base_url()?.to_string(),
129            "http://127.0.0.1/?transport=polling"
130        );
131        assert_ne!(transport.base_url()?.to_string(), url);
132        Ok(())
133    }
134
135    #[test]
136    fn transport_debug() -> Result<()> {
137        let mut url = crate::test::engine_io_server()?;
138        let transport =
139            PollingTransport::new(Url::from_str(&url.to_string()[..]).unwrap(), None, None);
140        url.query_pairs_mut().append_pair("transport", "polling");
141        assert_eq!(format!("PollingTransport {{ client: {:?}, base_url: RwLock {{ data: {:?}, poisoned: false, .. }} }}", transport.client, url), format!("{:?}", transport));
142        let test: Box<dyn Transport> = Box::new(transport);
143        assert_eq!(
144            format!("Transport(base_url: Ok({:?}))", url),
145            format!("{:?}", test)
146        );
147        Ok(())
148    }
149}