use crate::body::BodyType;
use crate::error::HlsResult;
use crate::ext::{ReqExt, ReqParam};
use crate::ext::{ReqGenExt, ReqPriExt};
use crate::hpack::HPackCoding;
use crate::json::JsonValue;
use crate::packet::{FrameFlag, FrameType, H2Frame, H2FrameRBuf, HeaderParam};
use crate::reader::{ReadExt, Reader};
use crate::request::RequestBuffer;
use crate::stream::{ConnParam, Proxy, Stream};
use crate::*;
use std::mem;
pub struct AcReq {
header: Header,
scheme: Scheme,
addr: Addr,
stream: Stream,
timeout: Timeout,
callback: Option<ReqCallback>,
stream_id: u32,
body: BodyType,
proxy: Proxy,
fingerprint: Fingerprint,
verify: bool,
auto_redirect: bool,
buffer: Buffer,
hpack_coder: HPackCoding,
certs: Vec<Certificate>,
key: RsaKey,
ca_certs: Vec<Certificate>,
}
impl Default for AcReq {
fn default() -> Self {
AcReq {
header: Header::new_req_h1(),
scheme: Scheme::Http,
addr: Addr::default(),
stream: Stream::NonConnection,
timeout: Timeout::default(),
callback: None,
stream_id: 0,
proxy: Proxy::Null,
fingerprint: Fingerprint::default(),
body: BodyType::new_byte(vec![]),
verify: true,
auto_redirect: true,
buffer: Buffer::with_capacity(32826),
hpack_coder: HPackCoding::new(4096),
certs: vec![],
key: RsaKey::none(),
ca_certs: vec![],
}
}
}
impl AcReq {
pub fn new() -> AcReq {
AcReq::default()
}
pub async fn get(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::GET);
self.stream_io().await
}
pub async fn post(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::POST);
self.stream_io().await
}
pub async fn put(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::PUT);
self.stream_io().await
}
pub async fn options(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::OPTIONS);
self.stream_io().await
}
pub async fn delete(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::DELETE);
self.stream_io().await
}
pub async fn head(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::HEAD);
self.stream_io().await
}
pub async fn trace(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::TRACE);
self.stream_io().await
}
pub async fn patch(&mut self) -> HlsResult<Response> {
self.header.set_method(Method::PATCH);
self.stream_io().await
}
pub async fn h1_io(&mut self) -> HlsResult<Response> {
let mut response = Response::new();
let mut read_len = 0;
loop {
self.stream.async_read(&mut self.buffer).await?;
if self.handle_h1_res(&mut response, &mut read_len)? { break; }
}
Ok(response)
}
pub(crate) async fn handle_io(&mut self) -> HlsResult<Response> {
let mut request = RequestBuffer::new(&mut self.header, &mut self.body, HeaderParam {
addr: &self.addr,
scheme: &self.scheme,
encoder: self.hpack_coder.encoder(),
stream_identifier: &self.stream_id,
body_len: 0,
})?;
self.buffer.reset();
loop {
let mut render = Reader::new(self.buffer.unfilled_mut());
let len = request.read(&mut render)?;
if len == 0 { break; }
self.stream.async_write(render.filled()).await?;
}
let response = match self.header.alpn() {
ALPN::Http20 => self.h2c_io().await,
_ => self.h1_io().await
}?;
self.update_cookie(&response);
self.callback = None;
if let ALPN::Http20 = self.header.alpn() { self.stream_id += 2; }
Ok(response)
}
pub async fn stream_io(&mut self) -> HlsResult<Response> {
for i in 0..self.timeout.handle_times() {
let res = tokio::time::timeout(self.timeout.handle(), self.handle_io()).await;
self.buffer.reset();
match res {
Ok(res) => match res {
Ok(res) => {
let code = res.header().status().code();
return if self.auto_redirect && (300..400).contains(&code) {
let location = res.header().location().ok_or("missing location")?;
if location.starts_with("http") {
self.set_url(location).await?;
} else {
self.header.set_uri(Uri::try_from(location)?);
}
self.header.set_method(Method::GET);
Box::pin(self.stream_io()).await
} else {
Ok(res)
};
}
Err(e) => {
if i != self.timeout.handle_times() - 1 {
if e.to_string().to_lowercase().contains("close") || e.to_string().contains("䏿¢äº†") || e.to_string().contains("å…³é—") {
self.re_conn().await?;
}
println!("[AcReq] write/recv with error-{}, handle: {}/{}", e, i + 2, self.timeout.handle_times());
continue;
}
}
}
Err(_) => if i != self.timeout.handle_times() - 1 {
println!("[AcReq] write/recv timeout, timeout: {:?}, handle: {}/{}", self.timeout.handle(), i + 2, self.timeout.handle_times());
continue;
}
}
}
Err("stream io error".into())
}
pub async fn re_conn(&mut self) -> HlsResult<()> {
self.buffer.reset();
for i in 0..self.timeout.connect_times() {
let param = ConnParam {
scheme: &self.scheme,
addr: &self.addr,
proxy: &self.proxy,
timeout: &self.timeout,
fingerprint: &mut self.fingerprint,
alpn: self.header.alpn(),
verify: self.verify,
cert: &mut self.certs,
key: &mut self.key,
ca_cert: &self.ca_certs,
};
let res = tokio::time::timeout(self.timeout.connect(), self.stream.async_conn(param)).await;
match &res {
Ok(res) => if let Err(e) = res && i != self.timeout.handle_times() - 1 {
println!("[AcReq] connect with error-{}, handle: {}/{}", e, i + 2, self.timeout.handle_times());
continue;
}
Err(e) => if i != self.timeout.handle_times() - 1 {
println!("[AcReq] connect error, error: {:?}, handle: {}/{}", e.to_string(), i + 2, self.timeout.handle_times());
continue;
}
}
return match res {
Ok(res) => match res {
Ok(alpn) => {
self.header.init_by_alpn(alpn);
if self.header.alpn() == &ALPN::Http20 { self.handle_h2_setting().await?; }
Ok(())
}
Err(e) => Err(e),
},
Err(_) => Err(format!("connect timeout, handle:{}; timeout: {:?}", self.timeout.handle_times(), self.timeout.connect()).into())
};
}
Err("[AcReq] connection error".into())
}
pub async fn with_url(mut self, url: impl AsRef<str>) -> HlsResult<Self> {
self.set_url(url).await?;
Ok(self)
}
pub async fn new_with_url(url: impl AsRef<str>) -> HlsResult<AcReq> {
let mut res = Self::new();
res.set_url(url).await?;
Ok(res)
}
pub async fn set_url(&mut self, url: impl AsRef<str>) -> HlsResult<()> {
let (scheme, addr, uri) = Url::try_from(url.as_ref())?.into_inner();
let old_addr = mem::replace(&mut self.addr, addr);
let old_scheme = mem::replace(&mut self.scheme, scheme);
self.header.set_uri(uri);
drop(mem::replace(&mut self.body, BodyType::Bytes(vec![])));
if self.addr.host() != old_addr.host() || self.scheme != old_scheme {
self.re_conn().await?;
}
Ok(())
}
pub async fn send_check(&mut self, method: Method) -> HlsResult<Response> {
self.header.set_method(method);
let response = self.stream_io().await?;
self.check_status(&response)?;
Ok(response)
}
pub async fn send_check_json(&mut self, method: Method, k: impl AsRef<str>, v: impl ToString, e: Vec<impl AsRef<str>>) -> HlsResult<JsonValue> {
let response = self.send_check(method).await?;
self.check_res(response, k, v, e)
}
}
impl AcReq {
pub async fn handle_h2_setting(&mut self) -> HlsResult<()> {
self.stream_id = 0;
self.buffer.write_slice(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
self.buffer.write_slice(self.fingerprint.h2_setting());
self.hpack_coder = HPackCoding::new(65535);
self.buffer.write_slice(self.fingerprint.h2_window_update());
self.stream.async_write(self.buffer.filled()).await?;
self.buffer.reset();
self.stream_id += 1;
Ok(())
}
pub async fn h2c_io(&mut self) -> HlsResult<Response> {
let mut response = Response::new();
loop {
self.stream.async_read(&mut self.buffer).await?;
while let Ok((frame_type, frame_flag, frame_len)) = H2FrameRBuf::buffer_enough(&self.buffer) {
if frame_type == FrameType::Settings && frame_flag.end_stream() {
let mut end_frame = H2Frame::none_frame();
end_frame.set_frame_type(FrameType::Settings);
end_frame.set_flag(FrameFlag::EndStream);
self.stream.async_write(end_frame.to_bytes().as_ref()).await?;
self.buffer.move_to(frame_len..self.buffer.len(), 0);
continue;
}
if self.handle_h2_res(frame_type, &mut response)? { return Ok(response); }
}
}
}
}
impl ReqGenExt for AcReq {
fn stream_mut(&mut self) -> &mut Stream {
&mut self.stream
}
}
impl ReqPriExt for AcReq {
fn into_stream(self) -> Stream {
self.stream
}
fn req_param(&mut self) -> ReqParam<'_> {
ReqParam {
header: &mut self.header,
buffer: &mut self.buffer,
hpack_coder: &mut self.hpack_coder,
callback: &mut self.callback,
addr: &self.addr,
sid: &self.stream_id,
scheme: &self.scheme,
}
}
fn body_type_mut(&mut self) -> &mut BodyType {
&mut self.body
}
}
impl ReqExt for AcReq {
fn header_mut(&mut self) -> &mut Header {
&mut self.header
}
fn header(&self) -> &Header {
&self.header
}
fn set_timeout(&mut self, timeout: Timeout) {
self.timeout = timeout;
}
fn timeout(&self) -> &Timeout {
&self.timeout
}
fn timeout_mut(&mut self) -> &mut Timeout {
&mut self.timeout
}
fn url(&self) -> String {
format!("{}://{}{}", self.scheme, self.addr, self.header.uri()).replace(":80", "").replace(":443", "")
}
fn set_proxy(&mut self, proxy: Proxy) {
self.proxy = proxy;
}
fn set_verify(&mut self, verify: bool) {
self.verify = verify;
}
fn set_auto_redirect(&mut self, auto_redirect: bool) {
self.auto_redirect = auto_redirect;
}
fn set_mtls(&mut self, certs: Vec<Certificate>, key: RsaKey, ca: Option<Vec<Certificate>>) {
self.certs = certs;
self.key = key;
self.ca_certs = ca.unwrap_or(vec![]);
}
fn set_callback(&mut self, callback: impl FnMut(&[u8]) -> HlsResult<()> + 'static) {
self.callback = Some(Box::new(callback));
}
fn set_fingerprint(&mut self, fingerprint: Fingerprint) {
self.fingerprint = fingerprint;
}
}
unsafe impl Send for AcReq {}
unsafe impl Sync for AcReq {}