Skip to main content

reqrio/
scq.rs

1use std::io::Cursor;
2use crate::body::BodyType;
3use crate::ext::{ReqParam, ReqPriExt};
4use crate::hpack::HPackCoding;
5use crate::packet::{FrameFlag, H2FrameRBuf};
6use crate::reader::{ReadExt, Reader};
7use crate::request::RequestBuffer;
8use crate::stream::{ConnParam, Stream};
9use crate::*;
10use json::JsonValue;
11use std::mem;
12
13#[repr(C)]
14pub struct ScReq {
15    header: Header,
16    scheme: Scheme,
17    addr: Addr,
18    stream: Stream,
19    body: BodyType,
20    callback: Option<ReqCallback>,
21    timeout: Timeout,
22    stream_id: u32,
23    proxy: Proxy,
24    fingerprint: Fingerprint,
25    verify: bool,
26    auto_redirect: bool,
27    buffer: Buffer,
28    certs: Vec<Certificate>,
29    key: RsaKey,
30}
31
32impl Default for ScReq {
33    fn default() -> Self {
34        ScReq {
35            header: Header::new_req_h1(),
36            scheme: Scheme::Http,
37            addr: Addr::default(),
38            stream: Stream::NonConnection,
39            body: BodyType::new_byte(vec![]),
40            callback: None,
41            timeout: Timeout::new(),
42            stream_id: 0,
43            proxy: Proxy::Null,
44            fingerprint: Fingerprint::default(),
45            verify: true,
46            auto_redirect: true,
47            buffer: Buffer::with_capacity(32826),
48            certs: vec![],
49            key: RsaKey::none(),
50        }
51    }
52}
53
54impl ScReq {
55    pub fn new() -> ScReq {
56        ScReq::default()
57    }
58
59    pub fn get(&mut self) -> HlsResult<Response> {
60        self.header.set_method(Method::GET);
61        self.stream_io()
62    }
63
64    pub fn post(&mut self) -> HlsResult<Response> {
65        self.header.set_method(Method::POST);
66        self.stream_io()
67    }
68
69    pub fn put(&mut self) -> HlsResult<Response> {
70        self.header.set_method(Method::PUT);
71        self.stream_io()
72    }
73
74    pub fn options(&mut self) -> HlsResult<Response> {
75        self.header.set_method(Method::OPTIONS);
76        self.stream_io()
77    }
78
79    pub fn delete(&mut self) -> HlsResult<Response> {
80        self.header.set_method(Method::DELETE);
81        self.stream_io()
82    }
83
84    pub fn head(&mut self) -> HlsResult<Response> {
85        self.header.set_method(Method::HEAD);
86        self.stream_io()
87    }
88
89    pub fn trace(&mut self) -> HlsResult<Response> {
90        self.header.set_method(Method::TRACE);
91        self.stream_io()
92    }
93
94    pub fn patch(&mut self) -> HlsResult<Response> {
95        self.header.set_method(Method::PATCH);
96        self.stream_io()
97    }
98
99    pub fn h1_io_by_raw(&mut self, context: impl AsRef<[u8]>) -> HlsResult<Response> {
100        self.buffer.write_slice(context.as_ref());
101        self.h1_io()
102    }
103
104    pub fn h1_io(&mut self) -> HlsResult<Response> {
105        let mut response = Response::new();
106        let mut read_len = 0;
107        loop {
108            self.stream.sync_read(&mut self.buffer)?;
109            if self.handle_h1_res(&mut response, &mut read_len)? { break; }
110        }
111        Ok(response)
112    }
113
114    fn handle_io(&mut self) -> HlsResult<Response> {
115        let mut request = RequestBuffer::new(&mut self.header, &self.addr, &self.scheme, &self.stream_id, &mut self.body);
116        self.buffer.reset();
117        loop {
118            let mut render = Reader::new(self.buffer.unfilled_mut());
119            let len = request.read(&mut render)?;
120            if len == 0 { break; }
121            self.stream.sync_write(render.filled())?;
122        }
123        let response = match self.header.alpn() {
124            ALPN::Http20 => self.h2c_io(),
125            _ => self.h1_io()
126        }?;
127        self.update_cookie(&response);
128        self.callback = None;
129        if let ALPN::Http20 = self.header.alpn() { self.stream_id += 2; }
130        Ok(response)
131    }
132
133    pub fn stream_io(&mut self) -> HlsResult<Response> {
134        for i in 0..self.timeout.handle_times() {
135            let res = self.handle_io();
136            self.buffer.reset();
137            match res {
138                Ok(res) => {
139                    let code = res.header().status().code();
140                    return if self.auto_redirect && (300..400).contains(&code) {
141                        let location = res.header().location().ok_or("missing location")?;
142                        if location.starts_with("http") {
143                            self.set_url(location)?;
144                        } else {
145                            self.header.set_uri(Uri::try_from(location)?);
146                        }
147                        self.header.set_method(Method::GET);
148                        self.stream_io()
149                    } else {
150                        Ok(res)
151                    };
152                }
153                Err(e) => if i != self.timeout.handle_times() - 1 {
154                    if self.timeout.is_peer_closed(e.to_string()) {
155                        self.re_conn()?;
156                    }
157                    println!("[ScReq] write/recv error, error: {}, handle: {}/{}", e, i + 2, self.timeout.handle_times());
158                    continue;
159                }
160            }
161        }
162        Err("stream io error".into())
163    }
164
165    pub fn re_conn(&mut self) -> HlsResult<()> {
166        *self.header.hpack_coder() = HPackCoding::new();
167        self.stream_id = 0;
168        self.buffer.reset();
169        for i in 0..self.timeout.connect_times() {
170            let param = ConnParam {
171                scheme: &self.scheme,
172                addr: &self.addr,
173                proxy: &self.proxy,
174                timeout: &self.timeout,
175                fingerprint: &mut self.fingerprint,
176                alpn: self.header.alpn(),
177                verify: self.verify,
178                cert: &mut self.certs,
179                key: &self.key,
180            };
181            match self.stream.sync_conn(param) {
182                Ok(alpn) => {
183                    self.header.init_by_alpn(alpn);
184                    if self.header.alpn() == &ALPN::Http20 { self.handle_h2_setting()?; }
185                    return Ok(());
186                }
187                Err(e) => if i != self.timeout.connect_times() - 1 {
188                    println!("[ScReq] continue with error-{}, handle: {}/{}", e, i + 2, self.timeout.handle_times());
189                    continue;
190                }
191            }
192        }
193        Err("[ScReq] connection error".into())
194    }
195
196    pub fn with_url(mut self, url: impl AsRef<str>) -> HlsResult<Self> {
197        self.set_url(url)?;
198        Ok(self)
199    }
200
201    pub fn with_fingerprint(mut self, fingerprint: Fingerprint) -> Self {
202        self.fingerprint = fingerprint;
203        self
204    }
205
206    pub fn set_fingerprint(&mut self, fingerprint: Fingerprint) {
207        self.fingerprint = fingerprint;
208    }
209
210    pub fn new_with_url(url: impl AsRef<str>) -> HlsResult<ScReq> {
211        let mut res = Self::new();
212        res.set_url(url)?;
213        Ok(res)
214    }
215
216    pub fn set_url(&mut self, url: impl AsRef<str>) -> HlsResult<()> {
217        let (scheme, addr, uri) = Url::try_from(url.as_ref())?.into_inner();
218        let old_addr = mem::replace(&mut self.addr, addr);
219        let old_scheme = mem::replace(&mut self.scheme, scheme);
220        drop(mem::replace(&mut self.body, BodyType::Bytes(Cursor::new(vec![]))));
221        self.header.set_uri(uri);
222        if old_addr.host() != self.addr.host() || self.scheme != old_scheme {
223            let host = self.addr.to_string().replace(":80", "").replace(":443", "");
224            self.header.set_host(host)?;
225            self.re_conn()?;
226        }
227        Ok(())
228    }
229
230    pub fn send_check(&mut self, method: Method) -> HlsResult<Response> {
231        self.header.set_method(method);
232        let response = self.stream_io()?;
233        self.check_status(&response)?;
234        Ok(response)
235    }
236
237    pub fn send_check_json(&mut self, method: Method, k: impl AsRef<str>, v: impl ToString, e: Vec<impl AsRef<str>>) -> HlsResult<JsonValue> {
238        let response = self.send_check(method)?;
239        self.check_res(response, k, v, e)
240    }
241}
242
243impl ScReq {
244    pub fn handle_h2_setting(&mut self) -> HlsResult<()> {
245        self.buffer.write_slice(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
246        self.buffer.write_slice(self.fingerprint.h2_setting());
247        self.buffer.write_slice(self.fingerprint.h2_window_update());
248        self.stream.sync_write(self.buffer.filled())?;
249        self.buffer.reset();
250        self.stream_id += 1;
251        Ok(())
252    }
253
254    pub fn h2c_io(&mut self) -> HlsResult<Response> {
255        let mut response = Response::new();
256        loop {
257            self.stream.sync_read(&mut self.buffer)?;
258            while let Ok((frame_type, frame_flag, frame_len)) = H2FrameRBuf::buffer_enough(&self.buffer) {
259                if frame_type == FrameType::Settings && frame_flag.end_stream() {
260                    let mut end_frame = H2Frame::none_frame();
261                    end_frame.set_frame_type(FrameType::Settings);
262                    end_frame.set_flag(FrameFlag::EndStream);
263                    self.stream.sync_write(end_frame.to_bytes().as_ref())?;
264                    self.buffer.move_to(frame_len..self.buffer.len(), 0);
265                    continue;
266                }
267                if self.handle_h2_res(frame_type, &mut response)? { return Ok(response); }
268            }
269        }
270    }
271}
272
273impl ReqGenExt for ScReq {}
274
275impl ReqPriExt for ScReq {
276    fn into_stream(self) -> Stream {
277        self.stream
278    }
279
280    fn req_param(&mut self) -> ReqParam<'_> {
281        ReqParam {
282            header: &mut self.header,
283            buffer: &mut self.buffer,
284            callback: &mut self.callback,
285        }
286    }
287
288    fn body_type_mut(&mut self) -> &mut BodyType {
289        &mut self.body
290    }
291}
292
293impl ReqExt for ScReq {
294    fn header_mut(&mut self) -> &mut Header {
295        &mut self.header
296    }
297
298    fn header(&self) -> &Header {
299        &self.header
300    }
301
302    fn set_timeout(&mut self, timeout: Timeout) {
303        self.timeout = timeout;
304    }
305
306    fn timeout(&self) -> &Timeout {
307        &self.timeout
308    }
309
310    fn timeout_mut(&mut self) -> &mut Timeout {
311        &mut self.timeout
312    }
313
314    fn url(&self) -> String {
315        format!("{}://{}{}", self.scheme, self.addr, self.header.uri()).replace(":80", "").replace(":443", "")
316    }
317
318    fn set_proxy(&mut self, proxy: Proxy) {
319        self.proxy = proxy;
320    }
321
322    fn set_verify(&mut self, verify: bool) {
323        self.verify = verify;
324    }
325
326    fn set_auto_redirect(&mut self, auto_redirect: bool) {
327        self.auto_redirect = auto_redirect;
328    }
329
330    fn set_mtls(&mut self, certs: Vec<Certificate>, key: RsaKey) {
331        self.certs = certs;
332        self.key = key;
333    }
334
335    fn set_callback(&mut self, callback: impl FnMut(&[u8]) -> HlsResult<()> + 'static) {
336        self.callback = Some(Box::new(callback));
337    }
338
339    fn set_fingerprint(&mut self, fingerprint: Fingerprint) {
340        self.fingerprint = fingerprint;
341    }
342}
343
344#[cfg(feature = "export")]
345unsafe impl Send for ScReq {}