use {
crate::{
body::Body,
downstream::DownstreamResponse,
error::Error,
framing::{content_length_is_valid, transfer_encoding_is_supported},
headers::filter_outgoing_headers,
pushpin::PushpinRedirectInfo,
session::ViceroyResponseMetadata,
wiggle_abi::types::FramingHeadersMode,
},
hyper::http::response::Response,
std::mem,
tokio::sync::oneshot::Sender,
};
pub enum DownstreamResponseState {
Closed,
Pending(Sender<DownstreamResponse>),
RedirectingToPushpin,
Sent,
}
impl DownstreamResponseState {
pub fn new(sender: Sender<DownstreamResponse>) -> Self {
DownstreamResponseState::Pending(sender)
}
pub fn is_unsent(&self) -> bool {
matches!(self, Self::Pending(_))
}
pub fn send(&mut self, mut response: Response<Body>) -> Result<(), Error> {
use DownstreamResponseState::{Closed, Pending, RedirectingToPushpin, Sent};
let mut framing_headers_mode = response
.extensions()
.get::<ViceroyResponseMetadata>()
.map(|metadata: &ViceroyResponseMetadata| metadata.framing_headers_mode)
.unwrap_or(FramingHeadersMode::Automatic);
if framing_headers_mode == FramingHeadersMode::ManuallyFromHeaders {
if !content_length_is_valid(response.headers()) {
tracing::warn!(
"Downstream response has malformed Content-Length header, falling back to automatic framing."
);
framing_headers_mode = FramingHeadersMode::Automatic;
} else if !transfer_encoding_is_supported(response.headers()) {
tracing::warn!(
"Downstream response has unsupported Transfer-Encoding header, falling back to automatic framing."
);
framing_headers_mode = FramingHeadersMode::Automatic;
} else if !response
.headers()
.contains_key(hyper::header::CONTENT_LENGTH)
&& !response
.headers()
.contains_key(hyper::header::TRANSFER_ENCODING)
{
tracing::warn!(
"Downstream response has neither Content-Length nor Transfer-Encoding header, falling back to automatic framing."
);
framing_headers_mode = FramingHeadersMode::Automatic;
}
}
if framing_headers_mode != FramingHeadersMode::ManuallyFromHeaders {
filter_outgoing_headers(response.headers_mut());
}
if response.status().as_u16() == 103 {
tracing::warn!(
"Guest returned 103 Early Hints response which will not be sent to the client"
);
tracing::info!("{:#?}", response);
return Ok(());
} else if response.status().is_informational() {
return Err(Error::InvalidArgument);
}
match mem::replace(self, Sent) {
Closed => panic!("downstream response channel was closed"),
Pending(sender) => sender
.send(DownstreamResponse::Http(response))
.map_err(|_| ())
.expect("response receiver is open"),
Sent | RedirectingToPushpin => return Err(Error::DownstreamRespSending),
}
Ok(())
}
pub fn redirect_to_pushpin(&mut self, redirect_info: PushpinRedirectInfo) -> Result<(), Error> {
use DownstreamResponseState::{Closed, Pending, RedirectingToPushpin, Sent};
match mem::replace(self, RedirectingToPushpin) {
Closed => panic!("downstream response channel was closed"),
Pending(sender) => sender
.send(DownstreamResponse::RedirectToPushpin(redirect_info))
.map_err(|_| ())
.expect("response receiver is open"),
Sent | RedirectingToPushpin => return Err(Error::DownstreamRespSending),
}
Ok(())
}
#[allow(unused)]
pub fn close(&mut self) {
*self = DownstreamResponseState::Closed;
}
}