tf_rust_engineio/transports/
websocket.rs1use crate::{
2 asynchronous::{
3 async_transports::WebsocketTransport as AsyncWebsocketTransport, transport::AsyncTransport,
4 },
5 error::Result,
6 transport::Transport,
7 Error,
8};
9use bytes::Bytes;
10use http::HeaderMap;
11use std::{sync::Arc, time::Duration};
12use tokio::runtime::Runtime;
13use url::Url;
14
15#[derive(Clone)]
16pub struct WebsocketTransport {
17 runtime: Arc<Runtime>,
18 inner: Arc<AsyncWebsocketTransport>,
19}
20
21impl WebsocketTransport {
22 pub fn new(base_url: Url, headers: Option<HeaderMap>) -> Result<Self> {
24 let runtime = tokio::runtime::Builder::new_current_thread()
25 .enable_all()
26 .build()?;
27
28 let inner = runtime.block_on(AsyncWebsocketTransport::new(base_url, headers))?;
29
30 Ok(WebsocketTransport {
31 runtime: Arc::new(runtime),
32 inner: Arc::new(inner),
33 })
34 }
35
36 pub(crate) fn upgrade(&self) -> Result<()> {
39 self.runtime.block_on(async { self.inner.upgrade().await })
40 }
41}
42
43impl Transport for WebsocketTransport {
44 fn emit(&self, data: Bytes, is_binary_att: bool) -> Result<()> {
45 self.runtime
46 .block_on(async { self.inner.emit(data, is_binary_att).await })
47 }
48
49 fn poll(&self, timeout: Duration) -> Result<Bytes> {
50 self.runtime.block_on(async {
51 let r = match tokio::time::timeout(timeout, self.inner.poll_next()).await {
52 Ok(r) => r,
53 Err(_) => return Err(Error::PingTimeout()),
54 };
55 match r {
56 Ok(b) => b.ok_or(Error::IncompletePacket()),
57 Err(_) => Err(Error::IncompletePacket()),
58 }
59 })
60 }
61
62 fn base_url(&self) -> Result<url::Url> {
63 self.runtime.block_on(async { self.inner.base_url().await })
64 }
65
66 fn set_base_url(&self, url: url::Url) -> Result<()> {
67 self.runtime
68 .block_on(async { self.inner.set_base_url(url).await })
69 }
70}
71
72impl std::fmt::Debug for WebsocketTransport {
73 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
74 f.write_fmt(format_args!(
75 "WebsocketTransport(base_url: {:?})",
76 self.base_url(),
77 ))
78 }
79}
80
81#[cfg(test)]
82mod test {
83 use super::*;
84 use crate::ENGINE_IO_VERSION;
85 use std::str::FromStr;
86
87 const TIMEOUT_DURATION: Duration = Duration::from_secs(45);
88
89 fn new() -> Result<WebsocketTransport> {
90 let url = crate::test::engine_io_server()?.to_string()
91 + "engine.io/?EIO="
92 + &ENGINE_IO_VERSION.to_string();
93 WebsocketTransport::new(Url::from_str(&url[..])?, None)
94 }
95
96 #[test]
97 fn websocket_transport_base_url() -> Result<()> {
98 let transport = new()?;
99 let mut url = crate::test::engine_io_server()?;
100 url.set_path("/engine.io/");
101 url.query_pairs_mut()
102 .append_pair("EIO", &ENGINE_IO_VERSION.to_string())
103 .append_pair("transport", "websocket");
104 url.set_scheme("ws").unwrap();
105 assert_eq!(transport.base_url()?.to_string(), url.to_string());
106 transport.set_base_url(reqwest::Url::parse("https://127.0.0.1")?)?;
107 assert_eq!(
108 transport.base_url()?.to_string(),
109 "ws://127.0.0.1/?transport=websocket"
110 );
111 assert_ne!(transport.base_url()?.to_string(), url.to_string());
112
113 transport.set_base_url(reqwest::Url::parse(
114 "http://127.0.0.1/?transport=websocket",
115 )?)?;
116 assert_eq!(
117 transport.base_url()?.to_string(),
118 "ws://127.0.0.1/?transport=websocket"
119 );
120 assert_ne!(transport.base_url()?.to_string(), url.to_string());
121 Ok(())
122 }
123
124 #[test]
125 fn websocket_secure_debug() -> Result<()> {
126 let transport = new()?;
127 assert_eq!(
128 format!("{:?}", transport),
129 format!("WebsocketTransport(base_url: {:?})", transport.base_url())
130 );
131 println!("{:?}", transport.poll(TIMEOUT_DURATION).unwrap());
132 println!("{:?}", transport.poll(TIMEOUT_DURATION).unwrap());
133 Ok(())
134 }
135}