use super::error_resp;
use super::v1::server::HttpSession as SessionV1;
use super::v2::server::HttpSession as SessionV2;
use super::HttpTask;
use crate::protocols::{Digest, SocketAddr, Stream};
use bytes::Bytes;
use http::HeaderValue;
use http::{header::AsHeaderName, HeaderMap};
use log::error;
use pingora_error::Result;
use pingora_http::{RequestHeader, ResponseHeader};
use std::time::Duration;
pub enum Session {
H1(SessionV1),
H2(SessionV2),
}
impl Session {
pub fn new_http1(stream: Stream) -> Self {
Self::H1(SessionV1::new(stream))
}
pub fn new_http2(session: SessionV2) -> Self {
Self::H2(session)
}
pub fn is_http2(&self) -> bool {
matches!(self, Self::H2(_))
}
pub async fn read_request(&mut self) -> Result<bool> {
match self {
Self::H1(s) => {
let read = s.read_request().await?;
Ok(read.is_some())
}
Self::H2(_) => Ok(true),
}
}
pub fn req_header(&self) -> &RequestHeader {
match self {
Self::H1(s) => s.req_header(),
Self::H2(s) => s.req_header(),
}
}
pub fn req_header_mut(&mut self) -> &mut RequestHeader {
match self {
Self::H1(s) => s.req_header_mut(),
Self::H2(s) => s.req_header_mut(),
}
}
pub fn get_header<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
self.req_header().headers.get(key)
}
pub fn get_header_bytes<K: AsHeaderName>(&self, key: K) -> &[u8] {
self.get_header(key).map_or(b"", |v| v.as_bytes())
}
pub async fn read_request_body(&mut self) -> Result<Option<Bytes>> {
match self {
Self::H1(s) => s.read_body_bytes().await,
Self::H2(s) => s.read_body_bytes().await,
}
}
pub async fn write_response_header(&mut self, resp: Box<ResponseHeader>) -> Result<()> {
match self {
Self::H1(s) => {
s.write_response_header(resp).await?;
Ok(())
}
Self::H2(s) => s.write_response_header(resp, false),
}
}
pub async fn write_response_header_ref(&mut self, resp: &ResponseHeader) -> Result<()> {
match self {
Self::H1(s) => {
s.write_response_header_ref(resp).await?;
Ok(())
}
Self::H2(s) => s.write_response_header_ref(resp, false),
}
}
pub async fn write_response_body(&mut self, data: Bytes, end: bool) -> Result<()> {
if data.is_empty() && !end {
return Ok(());
}
match self {
Self::H1(s) => {
s.write_body(&data).await?;
Ok(())
}
Self::H2(s) => s.write_body(data, end),
}
}
pub async fn write_response_trailers(&mut self, trailers: HeaderMap) -> Result<()> {
match self {
Self::H1(_) => Ok(()), Self::H2(s) => s.write_trailers(trailers),
}
}
pub async fn finish(self) -> Result<Option<Stream>> {
match self {
Self::H1(mut s) => {
s.finish_body().await?;
Ok(s.reuse().await)
}
Self::H2(mut s) => {
s.finish()?;
Ok(None)
}
}
}
pub async fn response_duplex_vec(&mut self, tasks: Vec<HttpTask>) -> Result<bool> {
match self {
Self::H1(s) => s.response_duplex_vec(tasks).await,
Self::H2(s) => s.response_duplex_vec(tasks),
}
}
pub fn set_keepalive(&mut self, duration: Option<u64>) {
match self {
Self::H1(s) => s.set_server_keepalive(duration),
Self::H2(_) => {}
}
}
pub fn set_write_timeout(&mut self, timeout: Duration) {
match self {
Self::H1(s) => s.set_write_timeout(timeout),
Self::H2(_) => {}
}
}
pub fn set_min_send_rate(&mut self, rate: usize) {
match self {
Self::H1(s) => s.set_min_send_rate(rate),
Self::H2(_) => {}
}
}
pub fn set_ignore_info_resp(&mut self, ignore: bool) {
match self {
Self::H1(s) => s.set_ignore_info_resp(ignore),
Self::H2(_) => {} }
}
pub fn request_summary(&self) -> String {
match self {
Self::H1(s) => s.request_summary(),
Self::H2(s) => s.request_summary(),
}
}
pub fn response_written(&self) -> Option<&ResponseHeader> {
match self {
Self::H1(s) => s.response_written(),
Self::H2(s) => s.response_written(),
}
}
pub async fn shutdown(&mut self) {
match self {
Self::H1(s) => s.shutdown().await,
Self::H2(s) => s.shutdown(),
}
}
pub fn to_h1_raw(&self) -> Bytes {
match self {
Self::H1(s) => s.get_headers_raw_bytes(),
Self::H2(s) => s.pseudo_raw_h1_request_header(),
}
}
pub fn is_body_done(&mut self) -> bool {
match self {
Self::H1(s) => s.is_body_done(),
Self::H2(s) => s.is_body_done(),
}
}
pub async fn finish_body(&mut self) -> Result<()> {
match self {
Self::H1(s) => s.finish_body().await.map(|_| ()),
Self::H2(s) => s.finish(),
}
}
pub fn generate_error(error: u16) -> ResponseHeader {
match error {
502 => error_resp::HTTP_502_RESPONSE.clone(),
400 => error_resp::HTTP_400_RESPONSE.clone(),
_ => error_resp::gen_error_response(error),
}
}
pub async fn respond_error(&mut self, error: u16) {
let resp = Self::generate_error(error);
self.set_keepalive(None);
self.write_response_header(Box::new(resp))
.await
.unwrap_or_else(|e| {
error!("failed to send error response to downstream: {e}");
});
}
pub fn is_body_empty(&mut self) -> bool {
match self {
Self::H1(s) => s.is_body_empty(),
Self::H2(s) => s.is_body_empty(),
}
}
pub fn retry_buffer_truncated(&self) -> bool {
match self {
Self::H1(s) => s.retry_buffer_truncated(),
Self::H2(s) => s.retry_buffer_truncated(),
}
}
pub fn enable_retry_buffering(&mut self) {
match self {
Self::H1(s) => s.enable_retry_buffering(),
Self::H2(s) => s.enable_retry_buffering(),
}
}
pub fn get_retry_buffer(&self) -> Option<Bytes> {
match self {
Self::H1(s) => s.get_retry_buffer(),
Self::H2(s) => s.get_retry_buffer(),
}
}
pub async fn read_body_or_idle(&mut self, no_body_expected: bool) -> Result<Option<Bytes>> {
match self {
Self::H1(s) => s.read_body_or_idle(no_body_expected).await,
Self::H2(s) => s.read_body_or_idle(no_body_expected).await,
}
}
pub fn as_http1(&self) -> Option<&SessionV1> {
match self {
Self::H1(s) => Some(s),
Self::H2(_) => None,
}
}
pub fn as_http2(&self) -> Option<&SessionV2> {
match self {
Self::H1(_) => None,
Self::H2(s) => Some(s),
}
}
pub async fn write_continue_response(&mut self) -> Result<()> {
match self {
Self::H1(s) => s.write_continue_response().await,
Self::H2(s) => s.write_response_header(
Box::new(ResponseHeader::build(100, Some(0)).unwrap()),
false,
),
}
}
pub fn is_upgrade_req(&self) -> bool {
match self {
Self::H1(s) => s.is_upgrade_req(),
Self::H2(_) => false,
}
}
pub fn body_bytes_sent(&self) -> usize {
match self {
Self::H1(s) => s.body_bytes_sent(),
Self::H2(s) => s.body_bytes_sent(),
}
}
pub fn body_bytes_read(&self) -> usize {
match self {
Self::H1(s) => s.body_bytes_read(),
Self::H2(s) => s.body_bytes_read(),
}
}
pub fn digest(&self) -> Option<&Digest> {
match self {
Self::H1(s) => Some(s.digest()),
Self::H2(s) => s.digest(),
}
}
pub fn digest_mut(&mut self) -> Option<&mut Digest> {
match self {
Self::H1(s) => Some(s.digest_mut()),
Self::H2(s) => s.digest_mut(),
}
}
pub fn client_addr(&self) -> Option<&SocketAddr> {
match self {
Self::H1(s) => s.client_addr(),
Self::H2(s) => s.client_addr(),
}
}
pub fn server_addr(&self) -> Option<&SocketAddr> {
match self {
Self::H1(s) => s.server_addr(),
Self::H2(s) => s.server_addr(),
}
}
pub fn stream(&self) -> Option<&Stream> {
match self {
Self::H1(s) => Some(s.stream()),
Self::H2(_) => None,
}
}
}