tf_rust_engineio/transports/
polling.rs1use crate::error::{Error, Result};
2use crate::transport::Transport;
3use base64::{engine::general_purpose, Engine as _};
4use bytes::{BufMut, Bytes, BytesMut};
5use native_tls::TlsConnector;
6use reqwest::{
7 blocking::{Client, ClientBuilder},
8 header::HeaderMap,
9};
10use std::sync::{Arc, RwLock};
11use std::time::Duration;
12use url::Url;
13
14#[derive(Debug, Clone)]
15pub struct PollingTransport {
16 client: Arc<Client>,
17 base_url: Arc<RwLock<Url>>,
18}
19
20impl PollingTransport {
21 pub fn new(
23 base_url: Url,
24 tls_config: Option<TlsConnector>,
25 opening_headers: Option<HeaderMap>,
26 ) -> Self {
27 let client = match (tls_config, opening_headers) {
28 (Some(config), Some(map)) => ClientBuilder::new()
29 .use_preconfigured_tls(config)
30 .default_headers(map)
31 .build()
32 .unwrap(),
33 (Some(config), None) => ClientBuilder::new()
34 .use_preconfigured_tls(config)
35 .build()
36 .unwrap(),
37 (None, Some(map)) => ClientBuilder::new().default_headers(map).build().unwrap(),
38 (None, None) => Client::new(),
39 };
40
41 let mut url = base_url;
42 url.query_pairs_mut().append_pair("transport", "polling");
43
44 PollingTransport {
45 client: Arc::new(client),
46 base_url: Arc::new(RwLock::new(url)),
47 }
48 }
49}
50
51impl Transport for PollingTransport {
52 fn emit(&self, data: Bytes, is_binary_att: bool) -> Result<()> {
53 let data_to_send = if is_binary_att {
54 let mut packet_bytes = BytesMut::with_capacity(data.len() + 1);
56 packet_bytes.put_u8(b'b');
57
58 let encoded_data = general_purpose::STANDARD.encode(data);
59 packet_bytes.put(encoded_data.as_bytes());
60
61 packet_bytes.freeze()
62 } else {
63 data
64 };
65 let status = self
66 .client
67 .post(self.address()?)
68 .body(data_to_send)
69 .send()?
70 .status()
71 .as_u16();
72
73 if status != 200 {
74 let error = Error::IncompleteHttp(status);
75 return Err(error);
76 }
77
78 Ok(())
79 }
80
81 fn poll(&self, timeout: Duration) -> Result<Bytes> {
82 Ok(self
83 .client
84 .get(self.address()?)
85 .timeout(timeout)
86 .send()?
87 .bytes()?)
88 }
89
90 fn base_url(&self) -> Result<Url> {
91 Ok(self.base_url.read()?.clone())
92 }
93
94 fn set_base_url(&self, base_url: Url) -> Result<()> {
95 let mut url = base_url;
96 if !url
97 .query_pairs()
98 .any(|(k, v)| k == "transport" && v == "polling")
99 {
100 url.query_pairs_mut().append_pair("transport", "polling");
101 }
102 *self.base_url.write()? = url;
103 Ok(())
104 }
105}
106
107#[cfg(test)]
108mod test {
109 use super::*;
110 use std::str::FromStr;
111 #[test]
112 fn polling_transport_base_url() -> Result<()> {
113 let url = crate::test::engine_io_server()?.to_string();
114 let transport = PollingTransport::new(Url::from_str(&url[..]).unwrap(), None, None);
115 assert_eq!(
116 transport.base_url()?.to_string(),
117 url.clone() + "?transport=polling"
118 );
119 transport.set_base_url(Url::parse("https://127.0.0.1")?)?;
120 assert_eq!(
121 transport.base_url()?.to_string(),
122 "https://127.0.0.1/?transport=polling"
123 );
124 assert_ne!(transport.base_url()?.to_string(), url);
125
126 transport.set_base_url(Url::parse("http://127.0.0.1/?transport=polling")?)?;
127 assert_eq!(
128 transport.base_url()?.to_string(),
129 "http://127.0.0.1/?transport=polling"
130 );
131 assert_ne!(transport.base_url()?.to_string(), url);
132 Ok(())
133 }
134
135 #[test]
136 fn transport_debug() -> Result<()> {
137 let mut url = crate::test::engine_io_server()?;
138 let transport =
139 PollingTransport::new(Url::from_str(&url.to_string()[..]).unwrap(), None, None);
140 url.query_pairs_mut().append_pair("transport", "polling");
141 assert_eq!(format!("PollingTransport {{ client: {:?}, base_url: RwLock {{ data: {:?}, poisoned: false, .. }} }}", transport.client, url), format!("{:?}", transport));
142 let test: Box<dyn Transport> = Box::new(transport);
143 assert_eq!(
144 format!("Transport(base_url: Ok({:?}))", url),
145 format!("{:?}", test)
146 );
147 Ok(())
148 }
149}