Skip to main content

reqrio/stream/
ws.rs

1use reqrio_json::JsonValue;
2use crate::error::HlsResult;
3use crate::ext::ReqPriExt;
4use crate::stream::Stream;
5use crate::*;
6
7pub struct WebSocketBuilder<S: ReqExt>(S);
8
9impl<S: ReqExt> WebSocketBuilder<S> {
10    pub fn with_proxy(mut self, proxy: Proxy) -> WebSocketBuilder<S> {
11        self.0.set_proxy(proxy);
12        self
13    }
14
15    pub fn set_proxy(&mut self, proxy: Proxy) {
16        self.0.set_proxy(proxy);
17    }
18
19    pub fn with_origin(mut self, origin: impl ToString) -> HlsResult<WebSocketBuilder<S>> {
20        self.0.header_mut().set_origin(origin)?;
21        Ok(self)
22    }
23
24    pub fn with_cookie(mut self, cookie: impl AsRef<str>) -> HlsResult<WebSocketBuilder<S>> {
25        self.0.header_mut().set_cookie(cookie)?;
26        Ok(self)
27    }
28
29    pub fn with_user_agent(mut self, user_agent: impl ToString) -> HlsResult<WebSocketBuilder<S>> {
30        self.0.header_mut().set_user_agent(user_agent)?;
31        Ok(self)
32    }
33
34    pub fn with_header(mut self, key: impl AsRef<str>, val: impl ToString) -> HlsResult<WebSocketBuilder<S>> {
35        self.add_header(key, val)?;
36        Ok(self)
37    }
38
39    pub fn add_header(&mut self, key: impl AsRef<str>, val: impl ToString) -> HlsResult<()> {
40        self.0.header_mut().insert(key, val)
41    }
42
43    pub fn set_uri(&mut self, uri: impl TryInto<Uri>) -> Result<(), RlsError> {
44        self.0.set_uri(uri)
45    }
46
47    pub fn with_uri(mut self, uri: impl TryInto<Uri>) -> HlsResult<WebSocketBuilder<S>> {
48        self.0.set_uri(uri)?;
49        Ok(self)
50    }
51
52    pub fn with_params(mut self, param: JsonValue) -> Self {
53        self.set_params(param);
54        self
55    }
56
57    pub fn set_params(&mut self, param: JsonValue) {
58        self.0.set_params(param)
59    }
60}
61
62impl WebSocketBuilder<ScReq> {
63    pub fn build(mut self) -> HlsResult<WebSocket> {
64        self.0.re_conn()?;
65        WebSocket::add_header(self.0.header_mut())?;
66        Ok(WebSocket::new(WebSocket::connect_sync(self.0)?))
67    }
68
69    pub fn set_url(&mut self, url: impl AsRef<str>) -> HlsResult<()> {
70        self.0.set_url(url)?;
71        Ok(())
72    }
73
74    pub fn with_url(mut self, url: impl AsRef<str>) -> HlsResult<Self> {
75        self.set_url(url)?;
76        Ok(self)
77    }
78}
79
80#[cfg(feature = "aync")]
81impl WebSocketBuilder<AcReq> {
82    pub async fn build(mut self) -> HlsResult<WebSocket> {
83        self.0.re_conn().await?;
84        WebSocket::add_header(self.0.header_mut())?;
85        Ok(WebSocket::new(WebSocket::connect_async(self.0).await?))
86    }
87
88    pub async fn set_async_url(&mut self, url: impl AsRef<str>) -> HlsResult<()> {
89        self.0.set_url(url).await?;
90        Ok(())
91    }
92
93    pub async fn with_async_url(mut self, url: impl AsRef<str>) -> HlsResult<Self> {
94        self.set_async_url(url).await?;
95        Ok(self)
96    }
97}
98
99
100#[cfg_attr(feature = "export", repr(C))]
101pub struct WebSocket {
102    stream: Stream,
103    buffer: Buffer,
104}
105
106impl WebSocket {
107    fn add_header(headers: &mut Header) -> HlsResult<()> {
108        match headers.get_mut("Sec-WebSocket-Key") {
109            None => headers.insert("Sec-WebSocket-Key", "3eGwJ19k4qUKxRPJZUNYLw==")?,
110            Some(value) => if value.to_string() == "" { *value = HeaderValue::String("3eGwJ19k4qUKxRPJZUNYLw==".to_string()) }
111        }
112        match headers.get_mut("Connection") {
113            None => headers.set_connection("Upgrade")?,
114            Some(value) => if value.to_string() == "" { headers.set_connection("Upgrade")? }
115        }
116        match headers.get_mut("Sec-WebSocket-Version") {
117            None => headers.insert("Sec-WebSocket-Version", "13")?,
118            Some(value) => if value.to_string() == "" { *value = HeaderValue::String("13".to_string()) }
119        }
120        match headers.get_mut("Sec-WebSocket-Extensions") {
121            None => headers.insert("Sec-WebSocket-Extensions", "permessage-deflate; client_max_window_bits")?,
122            Some(value) => if value.to_string() == "" { *value = HeaderValue::String("permessage-deflate; client_max_window_bits".to_string()) }
123        }
124        match headers.get_mut("Upgrade") {
125            None => headers.insert("Upgrade", "websocket")?,
126            Some(value) => if value.to_string() == "" { *value = HeaderValue::String("websocket".to_string()) }
127        }
128        Ok(())
129    }
130}
131
132impl WebSocket {
133    fn new(stream: Stream) -> Self {
134        WebSocket {
135            stream,
136            buffer: Buffer::with_capacity(0xFFFF),
137        }
138    }
139}
140
141impl WebSocket {
142    pub fn sync_build() -> WebSocketBuilder<ScReq> {
143        WebSocketBuilder(ScReq::new().with_timeout(Timeout::longer()).with_alpn(ALPN::Http11))
144    }
145
146
147    fn connect_sync(mut req: ScReq) -> HlsResult<Stream> {
148        let resp = req.handle_io()?;
149        let status = resp.header().status();
150        if status != &HttpStatus::SwitchingProtocols { return Err(format!("Connect fail with code-{}", status).into()); }
151        Ok(req.into_stream())
152    }
153
154    pub fn open(url: impl AsRef<str>) -> HlsResult<WebSocket> {
155        Self::sync_build().with_url(url)?.build()
156    }
157
158    pub fn open_raw(url: impl AsRef<str>, context: impl AsRef<[u8]>) -> HlsResult<WebSocket> {
159        let mut req = ScReq::new().with_timeout(Timeout::longer()).with_url(url)?;
160        req.req_param().buffer.write_slice(context.as_ref());
161        Ok(WebSocket::new(Self::connect_sync(req)?))
162    }
163
164
165    pub fn write_frame(&mut self, frame: WsFrame) -> HlsResult<()> {
166        self.stream.sync_write(&frame.to_bytes())
167    }
168
169    pub fn read_frame(&mut self) -> HlsResult<WsFrame> {
170        if let Ok(frame) = WsFrame::from_buffer(&mut self.buffer) {
171            return Ok(frame);
172        }
173        loop {
174            self.stream.sync_read(&mut self.buffer)?;
175            if let Ok(frame) = WsFrame::from_buffer(&mut self.buffer) {
176                return Ok(frame);
177            }
178        }
179    }
180
181    pub fn shutdown(mut self) -> HlsResult<()> {
182        self.stream.sync_shutdown()
183    }
184}
185
186#[cfg(feature = "aync")]
187impl WebSocket {
188    pub fn async_build() -> WebSocketBuilder<AcReq> {
189        WebSocketBuilder(AcReq::new().with_timeout(Timeout::longer()).with_alpn(ALPN::Http11))
190    }
191
192    async fn connect_async(mut req: AcReq) -> HlsResult<Stream> {
193        let resp = req.h1_io().await?;
194        println!("{}", resp.raw_string());
195        let status = resp.header().status();
196        if status != &HttpStatus::SwitchingProtocols { return Err(format!("Connect fail with code-{}", status).into()); }
197        Ok(req.into_stream())
198    }
199
200    pub async fn open_async(url: impl AsRef<str>) -> HlsResult<WebSocket> {
201        Self::async_build().with_async_url(url).await?.build().await
202    }
203
204    pub async fn open_async_raw(url: impl AsRef<str>, context: impl AsRef<[u8]>) -> HlsResult<WebSocket> {
205        let mut req = AcReq::new().with_timeout(Timeout::longer()).with_url(url).await?;
206        req.req_param().buffer.write_slice(context.as_ref());
207        Ok(WebSocket::new(Self::connect_async(req).await?))
208    }
209
210
211    pub async fn async_write_frame(&mut self, frame: WsFrame) -> HlsResult<()> {
212        self.stream.async_write(&frame.to_bytes()).await
213    }
214
215    pub async fn async_read_frame(&mut self) -> HlsResult<WsFrame> {
216        if let Ok(frame) = WsFrame::from_buffer(&mut self.buffer) {
217            return Ok(frame);
218        }
219        loop {
220            self.stream.async_read(&mut self.buffer).await?;
221            if let Ok(frame) = WsFrame::from_buffer(&mut self.buffer) {
222                return Ok(frame);
223            }
224        }
225    }
226
227    pub async fn async_shutdown(mut self) -> HlsResult<()> {
228        self.stream.async_shutdown().await
229    }
230}