use std::future::Future;
use bytes::{Buf, Bytes};
use h3::client::RequestStream;
use h3_quinn::RecvStream;
use http::response::Parts;
use http::{Request, Response, StatusCode, Version};
use http_body::Body;
use hyper_body_utils::HttpBody;
use crate::{
client::conn::{BaseHttpConnection, ConnectionConfig},
errors::{DeboaError, ResponseError},
Result, MAX_ERROR_MESSAGE_SIZE,
};
pub trait DeboaUdpConnection: private::DeboaUdpConnectionSealed {
type Sender;
type ReqBody: Body + Unpin;
type ResBody: Body + Unpin;
fn connect(
config: &ConnectionConfig,
) -> impl Future<Output = Result<BaseHttpConnection<Self::Sender, Self::ReqBody, Self::ResBody>>>;
fn protocol(&self) -> Version;
fn send_request(
&mut self,
request: Request<Self::ReqBody>,
) -> impl Future<Output = Result<Response<Self::ResBody>>>;
fn process_response(
&self,
parts: Parts,
mut stream: RequestStream<RecvStream, Bytes>,
) -> impl Future<Output = Result<Response<HttpBody>>> + Send {
async move {
let status_code = parts.status;
if (!status_code.is_success()
&& !status_code.is_informational()
&& !status_code.is_redirection())
|| status_code == StatusCode::TOO_MANY_REQUESTS
{
let mut error_message = Vec::new();
let mut downloaded = 0;
while let Ok(Some(chunk)) = stream
.recv_data()
.await
{
if downloaded + error_message.len() > MAX_ERROR_MESSAGE_SIZE {
break;
}
error_message.extend_from_slice(chunk.chunk());
downloaded += error_message.len();
}
return Err(DeboaError::Response(ResponseError::Receive {
status_code,
message: format!(
"Could not process request ({}): {}",
status_code,
String::from_utf8_lossy(&error_message)
),
}));
}
let body = HttpBody::from_quic_client(stream);
let response = Response::from_parts(parts, body);
Ok(response)
}
}
}
pub(crate) mod private {
pub trait DeboaUdpConnectionSealed {}
}