1use reqrio_json::JsonValue;
2use crate::error::HlsResult;
3use crate::ext::ReqPriExt;
4use crate::stream::Stream;
5use crate::*;
6
7pub struct WebSocketBuilder<S: ReqExt>(S);
8
9impl<S: ReqExt> WebSocketBuilder<S> {
10 pub fn with_proxy(mut self, proxy: Proxy) -> WebSocketBuilder<S> {
11 self.0.set_proxy(proxy);
12 self
13 }
14
15 pub fn set_proxy(&mut self, proxy: Proxy) {
16 self.0.set_proxy(proxy);
17 }
18
19 pub fn with_origin(mut self, origin: impl ToString) -> HlsResult<WebSocketBuilder<S>> {
20 self.0.header_mut().set_origin(origin)?;
21 Ok(self)
22 }
23
24 pub fn with_cookie(mut self, cookie: impl AsRef<str>) -> HlsResult<WebSocketBuilder<S>> {
25 self.0.header_mut().set_cookie(cookie)?;
26 Ok(self)
27 }
28
29 pub fn with_user_agent(mut self, user_agent: impl ToString) -> HlsResult<WebSocketBuilder<S>> {
30 self.0.header_mut().set_user_agent(user_agent)?;
31 Ok(self)
32 }
33
34 pub fn with_header(mut self, key: impl AsRef<str>, val: impl ToString) -> HlsResult<WebSocketBuilder<S>> {
35 self.add_header(key, val)?;
36 Ok(self)
37 }
38
39 pub fn add_header(&mut self, key: impl AsRef<str>, val: impl ToString) -> HlsResult<()> {
40 self.0.header_mut().insert(key, val)
41 }
42
43 pub fn set_uri(&mut self, uri: impl TryInto<Uri>) -> Result<(), RlsError> {
44 self.0.set_uri(uri)
45 }
46
47 pub fn with_uri(mut self, uri: impl TryInto<Uri>) -> HlsResult<WebSocketBuilder<S>> {
48 self.0.set_uri(uri)?;
49 Ok(self)
50 }
51
52 pub fn with_params(mut self, param: JsonValue) -> Self {
53 self.set_params(param);
54 self
55 }
56
57 pub fn set_params(&mut self, param: JsonValue) {
58 self.0.set_params(param)
59 }
60}
61
62impl WebSocketBuilder<ScReq> {
63 pub fn build(mut self) -> HlsResult<WebSocket> {
64 self.0.re_conn()?;
65 WebSocket::add_header(self.0.header_mut())?;
66 Ok(WebSocket::new(WebSocket::connect_sync(self.0)?))
67 }
68
69 pub fn set_url(&mut self, url: impl AsRef<str>) -> HlsResult<()> {
70 self.0.set_url(url)?;
71 Ok(())
72 }
73
74 pub fn with_url(mut self, url: impl AsRef<str>) -> HlsResult<Self> {
75 self.set_url(url)?;
76 Ok(self)
77 }
78}
79
80#[cfg(feature = "aync")]
81impl WebSocketBuilder<AcReq> {
82 pub async fn build(mut self) -> HlsResult<WebSocket> {
83 self.0.re_conn().await?;
84 WebSocket::add_header(self.0.header_mut())?;
85 Ok(WebSocket::new(WebSocket::connect_async(self.0).await?))
86 }
87
88 pub async fn set_async_url(&mut self, url: impl AsRef<str>) -> HlsResult<()> {
89 self.0.set_url(url).await?;
90 Ok(())
91 }
92
93 pub async fn with_async_url(mut self, url: impl AsRef<str>) -> HlsResult<Self> {
94 self.set_async_url(url).await?;
95 Ok(self)
96 }
97}
98
99
100#[cfg_attr(feature = "export", repr(C))]
101pub struct WebSocket {
102 stream: Stream,
103 buffer: Buffer,
104}
105
106impl WebSocket {
107 fn add_header(headers: &mut Header) -> HlsResult<()> {
108 match headers.get_mut("Sec-WebSocket-Key") {
109 None => headers.insert("Sec-WebSocket-Key", "3eGwJ19k4qUKxRPJZUNYLw==")?,
110 Some(value) => if value.to_string() == "" { *value = HeaderValue::String("3eGwJ19k4qUKxRPJZUNYLw==".to_string()) }
111 }
112 match headers.get_mut("Connection") {
113 None => headers.set_connection("Upgrade")?,
114 Some(value) => if value.to_string() == "" { headers.set_connection("Upgrade")? }
115 }
116 match headers.get_mut("Sec-WebSocket-Version") {
117 None => headers.insert("Sec-WebSocket-Version", "13")?,
118 Some(value) => if value.to_string() == "" { *value = HeaderValue::String("13".to_string()) }
119 }
120 match headers.get_mut("Sec-WebSocket-Extensions") {
121 None => headers.insert("Sec-WebSocket-Extensions", "permessage-deflate; client_max_window_bits")?,
122 Some(value) => if value.to_string() == "" { *value = HeaderValue::String("permessage-deflate; client_max_window_bits".to_string()) }
123 }
124 match headers.get_mut("Upgrade") {
125 None => headers.insert("Upgrade", "websocket")?,
126 Some(value) => if value.to_string() == "" { *value = HeaderValue::String("websocket".to_string()) }
127 }
128 Ok(())
129 }
130}
131
132impl WebSocket {
133 fn new(stream: Stream) -> Self {
134 WebSocket {
135 stream,
136 buffer: Buffer::with_capacity(0xFFFF),
137 }
138 }
139}
140
141impl WebSocket {
142 pub fn sync_build() -> WebSocketBuilder<ScReq> {
143 WebSocketBuilder(ScReq::new().with_timeout(Timeout::longer()).with_alpn(ALPN::Http11))
144 }
145
146
147 fn connect_sync(mut req: ScReq) -> HlsResult<Stream> {
148 let resp = req.handle_io()?;
149 let status = resp.header().status();
150 if status != &HttpStatus::SwitchingProtocols { return Err(format!("Connect fail with code-{}", status).into()); }
151 Ok(req.into_stream())
152 }
153
154 pub fn open(url: impl AsRef<str>) -> HlsResult<WebSocket> {
155 Self::sync_build().with_url(url)?.build()
156 }
157
158 pub fn open_raw(url: impl AsRef<str>, context: impl AsRef<[u8]>) -> HlsResult<WebSocket> {
159 let mut req = ScReq::new().with_timeout(Timeout::longer()).with_url(url)?;
160 req.req_param().buffer.write_slice(context.as_ref());
161 Ok(WebSocket::new(Self::connect_sync(req)?))
162 }
163
164
165 pub fn write_frame(&mut self, frame: WsFrame) -> HlsResult<()> {
166 self.stream.sync_write(&frame.to_bytes())
167 }
168
169 pub fn read_frame(&mut self) -> HlsResult<WsFrame> {
170 if let Ok(frame) = WsFrame::from_buffer(&mut self.buffer) {
171 return Ok(frame);
172 }
173 loop {
174 self.stream.sync_read(&mut self.buffer)?;
175 if let Ok(frame) = WsFrame::from_buffer(&mut self.buffer) {
176 return Ok(frame);
177 }
178 }
179 }
180
181 pub fn shutdown(mut self) -> HlsResult<()> {
182 self.stream.sync_shutdown()
183 }
184}
185
186#[cfg(feature = "aync")]
187impl WebSocket {
188 pub fn async_build() -> WebSocketBuilder<AcReq> {
189 WebSocketBuilder(AcReq::new().with_timeout(Timeout::longer()).with_alpn(ALPN::Http11))
190 }
191
192 async fn connect_async(mut req: AcReq) -> HlsResult<Stream> {
193 let resp = req.h1_io().await?;
194 println!("{}", resp.raw_string());
195 let status = resp.header().status();
196 if status != &HttpStatus::SwitchingProtocols { return Err(format!("Connect fail with code-{}", status).into()); }
197 Ok(req.into_stream())
198 }
199
200 pub async fn open_async(url: impl AsRef<str>) -> HlsResult<WebSocket> {
201 Self::async_build().with_async_url(url).await?.build().await
202 }
203
204 pub async fn open_async_raw(url: impl AsRef<str>, context: impl AsRef<[u8]>) -> HlsResult<WebSocket> {
205 let mut req = AcReq::new().with_timeout(Timeout::longer()).with_url(url).await?;
206 req.req_param().buffer.write_slice(context.as_ref());
207 Ok(WebSocket::new(Self::connect_async(req).await?))
208 }
209
210
211 pub async fn async_write_frame(&mut self, frame: WsFrame) -> HlsResult<()> {
212 self.stream.async_write(&frame.to_bytes()).await
213 }
214
215 pub async fn async_read_frame(&mut self) -> HlsResult<WsFrame> {
216 if let Ok(frame) = WsFrame::from_buffer(&mut self.buffer) {
217 return Ok(frame);
218 }
219 loop {
220 self.stream.async_read(&mut self.buffer).await?;
221 if let Ok(frame) = WsFrame::from_buffer(&mut self.buffer) {
222 return Ok(frame);
223 }
224 }
225 }
226
227 pub async fn async_shutdown(mut self) -> HlsResult<()> {
228 self.stream.async_shutdown().await
229 }
230}