use crate::body::BodyType;
use crate::ext::{ReqParam, ReqPriExt};
use crate::hpack::HPackCoding;
use crate::packet::{FrameFlag, H2FrameRBuf, HeaderParam};
use crate::reader::{ReadExt, Reader};
use crate::request::RequestBuffer;
use crate::stream::{ConnParam, Stream};
use crate::*;
use json::JsonValue;
use std::mem;
#[repr(C)]
pub struct ScReq {
header: Header,
scheme: Scheme,
addr: Addr,
stream: Stream,
body: BodyType,
callback: Option<ReqCallback>,
timeout: Timeout,
stream_id: u32,
proxy: Proxy,
fingerprint: Fingerprint,
verify: bool,
auto_redirect: bool,
buffer: Buffer,
hpack_coder: HPackCoding,
certs: Vec<Certificate>,
key: RsaKey,
ca_certs: Vec<Certificate>,
}
impl Default for ScReq {
fn default() -> Self {
ScReq {
header: Header::new_req_h1(),
scheme: Scheme::Http,
addr: Addr::default(),
stream: Stream::NonConnection,
body: BodyType::new_byte(vec![]),
callback: None,
timeout: Timeout::default(),
stream_id: 0,
proxy: Proxy::Null,
fingerprint: Fingerprint::default(),
verify: true,
auto_redirect: true,
buffer: Buffer::with_capacity(32826),
hpack_coder: HPackCoding::new(4096),
certs: vec![],
key: RsaKey::none(),
ca_certs: vec![],
}
}
}
impl ScReq {
pub fn new() -> ScReq {
ScReq::default()
}
pub fn get(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::GET);
self.stream_io()
}
pub fn post(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::POST);
self.stream_io()
}
pub fn put(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::PUT);
self.stream_io()
}
pub fn options(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::OPTIONS);
self.stream_io()
}
pub fn delete(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::DELETE);
self.stream_io()
}
pub fn head(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::HEAD);
self.stream_io()
}
pub fn trace(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::TRACE);
self.stream_io()
}
pub fn patch(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::PATCH);
self.stream_io()
}
pub fn h1_io(&mut self) -> HlsResult<Response> {
let mut response = Response::new();
let mut read_len = 0;
loop {
self.stream.sync_read(&mut self.buffer)?;
if self.handle_h1_res(&mut response, &mut read_len)? { break; }
}
Ok(response)
}
pub(crate) fn handle_io(&mut self) -> HlsResult<Response> {
let mut request = RequestBuffer::new(&mut self.header, &mut self.body, HeaderParam {
addr: &self.addr,
scheme: &self.scheme,
stream_identifier: &self.stream_id,
encoder: self.hpack_coder.encoder(),
body_len: 0,
})?;
self.buffer.reset();
loop {
let mut render = Reader::new(self.buffer.unfilled_mut());
let len = request.read(&mut render)?;
if len == 0 { break; }
self.stream.sync_write(render.filled())?;
}
let response = match self.header.alpn() {
ALPN::Http20 => self.h2c_io(),
_ => self.h1_io()
}?;
self.update_cookie(&response);
self.callback = None;
if let ALPN::Http20 = self.header.alpn() { self.stream_id += 2; }
Ok(response)
}
pub fn stream_io(&mut self) -> HlsResult<Response> {
for i in 0..self.timeout.handle_times() {
let res = self.handle_io();
self.buffer.reset();
match res {
Ok(res) => {
let code = res.header().status().code();
return if self.auto_redirect && (300..400).contains(&code) {
let location = res.header().location().ok_or("missing location")?;
if location.starts_with("http") {
self.set_url(location)?;
} else {
self.header.set_uri(Uri::try_from(location)?);
}
self.header.set_method(Method::GET);
self.stream_io()
} else {
Ok(res)
};
}
Err(e) => if i != self.timeout.handle_times() - 1 {
if self.timeout.is_peer_closed(e.to_string()) {
self.re_conn()?;
}
println!("[ScReq] write/recv error, error: {}, handle: {}/{}", e, i + 2, self.timeout.handle_times());
continue;
}
}
}
Err("stream io error".into())
}
pub fn re_conn(&mut self) -> HlsResult<()> {
self.buffer.reset();
for i in 0..self.timeout.connect_times() {
let param = ConnParam {
scheme: &self.scheme,
addr: &self.addr,
proxy: &self.proxy,
timeout: &self.timeout,
fingerprint: &mut self.fingerprint,
alpn: self.header.alpn(),
verify: self.verify,
cert: &mut self.certs,
key: &self.key,
ca_cert: &self.ca_certs,
};
match self.stream.sync_conn(param) {
Ok(alpn) => {
self.header.init_by_alpn(alpn);
if self.header.alpn() == &ALPN::Http20 { self.handle_h2_setting()?; }
return Ok(());
}
Err(e) => if i != self.timeout.connect_times() - 1 {
println!("[ScReq] continue with error-{}, handle: {}/{}", e, i + 2, self.timeout.handle_times());
continue;
}
}
}
Err("[ScReq] connection error".into())
}
pub fn with_url(mut self, url: impl AsRef<str>) -> HlsResult<Self> {
self.set_url(url)?;
Ok(self)
}
pub fn with_fingerprint(mut self, fingerprint: Fingerprint) -> Self {
self.fingerprint = fingerprint;
self
}
pub fn set_fingerprint(&mut self, fingerprint: Fingerprint) {
self.fingerprint = fingerprint;
}
pub fn new_with_url(url: impl AsRef<str>) -> HlsResult<ScReq> {
let mut res = Self::new();
res.set_url(url)?;
Ok(res)
}
pub fn set_url(&mut self, url: impl AsRef<str>) -> HlsResult<()> {
let (scheme, addr, uri) = Url::try_from(url.as_ref())?.into_inner();
let old_addr = mem::replace(&mut self.addr, addr);
let old_scheme = mem::replace(&mut self.scheme, scheme);
drop(mem::replace(&mut self.body, BodyType::Bytes(vec![])));
self.header.set_uri(uri);
if old_addr.host() != self.addr.host() || self.scheme != old_scheme {
self.re_conn()?;
}
Ok(())
}
pub fn send_check(&mut self, method: Method) -> HlsResult<Response> {
self.header.set_method(method);
let response = self.stream_io()?;
self.check_status(&response)?;
Ok(response)
}
pub fn send_check_json(&mut self, method: Method, k: impl AsRef<str>, v: impl ToString, e: Vec<impl AsRef<str>>) -> HlsResult<JsonValue> {
let response = self.send_check(method)?;
self.check_res(response, k, v, e)
}
}
impl ScReq {
pub fn handle_h2_setting(&mut self) -> HlsResult<()> {
self.hpack_coder = HPackCoding::new(65535);
self.stream_id = 0;
self.buffer.write_slice(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
self.buffer.write_slice(self.fingerprint.h2_setting());
self.buffer.write_slice(self.fingerprint.h2_window_update());
self.stream.sync_write(self.buffer.filled())?;
self.buffer.reset();
self.stream_id += 1;
Ok(())
}
pub fn h2c_io(&mut self) -> HlsResult<Response> {
let mut response = Response::new();
loop {
self.stream.sync_read(&mut self.buffer)?;
while let Ok((frame_type, frame_flag, frame_len)) = H2FrameRBuf::buffer_enough(&self.buffer) {
if frame_type == FrameType::Settings && frame_flag.end_stream() {
let mut end_frame = H2Frame::none_frame();
end_frame.set_frame_type(FrameType::Settings);
end_frame.set_flag(FrameFlag::EndStream);
self.stream.sync_write(end_frame.to_bytes().as_ref())?;
self.buffer.move_to(frame_len..self.buffer.len(), 0);
continue;
}
if self.handle_h2_res(frame_type, &mut response)? { return Ok(response); }
}
}
}
}
impl ReqGenExt for ScReq {
fn stream_mut(&mut self) -> &mut Stream {
&mut self.stream
}
}
impl ReqPriExt for ScReq {
fn into_stream(self) -> Stream {
self.stream
}
fn req_param(&mut self) -> ReqParam<'_> {
ReqParam {
header: &mut self.header,
buffer: &mut self.buffer,
hpack_coder: &mut self.hpack_coder,
callback: &mut self.callback,
addr: &self.addr,
scheme: &self.scheme,
sid: &self.stream_id,
}
}
fn body_type_mut(&mut self) -> &mut BodyType {
&mut self.body
}
}
impl ReqExt for ScReq {
fn header_mut(&mut self) -> &mut Header {
&mut self.header
}
fn header(&self) -> &Header {
&self.header
}
fn set_timeout(&mut self, timeout: Timeout) {
self.timeout = timeout;
}
fn timeout(&self) -> &Timeout {
&self.timeout
}
fn timeout_mut(&mut self) -> &mut Timeout {
&mut self.timeout
}
fn url(&self) -> String {
format!("{}://{}{}", self.scheme, self.addr, self.header.uri()).replace(":80", "").replace(":443", "")
}
fn set_proxy(&mut self, proxy: Proxy) {
self.proxy = proxy;
}
fn set_verify(&mut self, verify: bool) {
self.verify = verify;
}
fn set_auto_redirect(&mut self, auto_redirect: bool) {
self.auto_redirect = auto_redirect;
}
fn set_mtls(&mut self, certs: Vec<Certificate>, key: RsaKey, ca: Option<Vec<Certificate>>) {
self.certs = certs;
self.ca_certs = ca.unwrap_or(vec![]);
self.key = key;
}
fn set_callback(&mut self, callback: impl FnMut(&[u8]) -> HlsResult<()> + 'static) {
self.callback = Some(Box::new(callback));
}
fn set_fingerprint(&mut self, fingerprint: Fingerprint) {
self.fingerprint = fingerprint;
}
}
#[cfg(feature = "export")]
unsafe impl Send for ScReq {}