use std::future::Future;
use http::{Request, Response, StatusCode, Version};
use http_body::Body;
use http_body_util::BodyExt;
use hyper::body::Incoming;
use hyper_body_utils::HttpBody;
use crate::{
client::conn::{BaseHttpConnection, ConnectionConfig},
errors::{DeboaError, RequestError, ResponseError},
Result, MAX_ERROR_MESSAGE_SIZE,
};
pub trait DeboaTcpConnection: private::DeboaTcpConnectionSealed {
type Sender;
type ReqBody: Body + Unpin;
type ResBody: Body + Unpin;
fn connect(
config: &ConnectionConfig,
) -> impl Future<Output = Result<BaseHttpConnection<Self::Sender, Self::ReqBody, Self::ResBody>>>;
fn protocol(&self) -> Version;
fn send_request(
&mut self,
request: Request<Self::ReqBody>,
) -> impl Future<Output = Result<Response<Self::ResBody>>>;
fn process_response(
&self,
_method: &str,
response: std::result::Result<Response<Incoming>, hyper::Error>,
) -> impl Future<Output = Result<Response<HttpBody>>> + Send {
async {
if let Err(err) = response {
return Err(DeboaError::Request(RequestError::Send {
url: "".to_string(),
message: err.to_string(),
}));
}
let response = response.unwrap();
let status_code = response.status();
if (!status_code.is_success()
&& !status_code.is_informational()
&& !status_code.is_redirection())
|| status_code == StatusCode::TOO_MANY_REQUESTS
{
let mut body = response.into_body();
let mut error_message = Vec::new();
let mut downloaded = 0;
while let Some(chunk) = body.frame().await {
if let Ok(frame) = chunk {
if let Some(data) = frame.data_ref() {
if downloaded + data.len() > MAX_ERROR_MESSAGE_SIZE {
break;
}
error_message.extend_from_slice(data);
downloaded += data.len();
}
}
}
return Err(DeboaError::Response(ResponseError::Receive {
status_code,
message: format!(
"Could not process request ({}): {}",
status_code,
String::from_utf8_lossy(&error_message)
),
}));
}
let (parts, body) = response.into_parts();
let response = Response::from_parts(parts, HttpBody::from_incoming(body));
Ok(response)
}
}
}
pub(crate) mod private {
pub trait DeboaTcpConnectionSealed {}
}