use bytes::{Buf, Bytes};
use dhttp_identity::identity as authority;
use futures::{Stream, StreamExt};
use http::{
HeaderMap, HeaderValue, Method, Uri,
header::{AsHeaderName, IntoHeaderName},
};
use snafu::{OptionExt, Report, ResultExt, Snafu};
use std::{future::Future, sync::Arc};
use tracing::Instrument;
use crate::{
h3x::{
dhttp::message::{MessageReader, MessageStreamError, MessageWriter},
endpoint::UnresolvedRequest,
error::Code,
protocol::Protocols,
qpack::field::Protocol,
stream_id::StreamId,
},
message::{
Body, IntoBody, MessageOperationError, ReadBufferedBodyError, ReadStreamingBodyError,
ReadToStringError, ReadTrailersError, RequestMessage, ResponseMessage,
WriteStreamingBodyError,
},
};
#[derive(Debug, Snafu)]
#[snafu(module)]
pub enum ResolveError {
#[snafu(display("failed to read server local agent"))]
LocalAuthority {
source: crate::h3x::quic::ConnectionError,
},
#[snafu(display("server request is missing local agent"))]
MissingLocalAuthority,
#[snafu(display("failed to read server remote authority"))]
RemoteAuthority {
source: crate::h3x::quic::ConnectionError,
},
#[snafu(display("failed to read request header"))]
ReadHeader { source: MessageStreamError },
}
pub async fn resolve(request: UnresolvedRequest) -> Result<(Request, Response), ResolveError> {
let UnresolvedRequest {
stream_id,
read_stream,
write_stream,
connection,
} = request;
let local_authority = connection
.local_authority()
.await
.context(resolve_error::LocalAuthoritySnafu)?
.context(resolve_error::MissingLocalAuthoritySnafu)?;
let remote_authority = connection
.remote_authority()
.await
.context(resolve_error::RemoteAuthoritySnafu)?;
let protocols = connection.protocols().clone();
let mut read_stream = read_stream;
let request_message = RequestMessage::read_from(&mut read_stream)
.await
.context(resolve_error::ReadHeaderSnafu)?;
let request = Request {
message: request_message,
stream: read_stream,
authority: remote_authority,
stream_id,
protocols: protocols.clone(),
};
let response = Response {
message: Some(ResponseMessage::default()),
stream: Some(write_stream),
authority: local_authority,
stream_id,
protocols,
};
Ok((request, response))
}
#[derive(Debug, Snafu)]
#[snafu(module)]
pub enum ResponseWriteError {
#[snafu(display("response is already finalized"))]
ResponseFinalized,
#[snafu(display("response message operation failed"))]
MessageOperation { source: MessageOperationError },
#[snafu(transparent)]
Body { source: WriteStreamingBodyError },
}
pub struct Request {
message: RequestMessage,
stream: MessageReader,
authority: Option<Arc<dyn authority::RemoteAuthority>>,
stream_id: StreamId,
protocols: Arc<Protocols>,
}
impl Request {
pub fn method(&self) -> Method {
self.message.method().clone()
}
pub fn protocol(&self) -> Option<Protocol> {
self.message.header().protocol().cloned()
}
pub fn uri(&self) -> Uri {
self.message.uri()
}
pub fn headers(&self) -> &http::HeaderMap {
self.message.header().header_map()
}
pub fn header(&self, name: impl AsHeaderName) -> Option<&HeaderValue> {
self.headers().get(name)
}
pub async fn read(&mut self) -> Option<Result<Bytes, ReadStreamingBodyError>> {
self.message
.read_streaming_body_from(&mut self.stream)
.await
}
pub async fn read_all(&mut self) -> Result<impl Buf, ReadBufferedBodyError> {
self.message.read_buffered_body_from(&mut self.stream).await
}
pub async fn read_to_bytes(&mut self) -> Result<Bytes, ReadBufferedBodyError> {
self.message.collect_bytes_body_from(&mut self.stream).await
}
pub async fn read_to_string(&mut self) -> Result<String, ReadToStringError> {
self.message
.collect_string_body_from(&mut self.stream)
.await
}
pub async fn as_stream(&mut self) -> impl Stream<Item = Result<Bytes, ReadStreamingBodyError>> {
futures::stream::unfold(self, async |this| {
this.read().await.map(|item| (item, this))
})
.fuse()
}
pub async fn into_stream(self) -> impl Stream<Item = Result<Bytes, ReadStreamingBodyError>> {
futures::stream::unfold(self, async |mut this| {
this.read().await.map(|item| (item, this))
})
.fuse()
}
pub async fn trailers(&mut self) -> Result<&HeaderMap, ReadTrailersError> {
self.message.read_trailers_from(&mut self.stream).await
}
pub async fn stop(&mut self, code: Code) -> Result<(), MessageStreamError> {
self.stream.stop(code).await
}
pub fn authority(&self) -> Option<&Arc<dyn authority::RemoteAuthority>> {
self.authority.as_ref()
}
pub fn stream_id(&self) -> StreamId {
self.stream_id
}
pub fn protocols(&self) -> &Arc<Protocols> {
&self.protocols
}
}
pub struct Response {
message: Option<ResponseMessage>,
stream: Option<MessageWriter>,
authority: Arc<dyn authority::LocalAuthority>,
stream_id: StreamId,
protocols: Arc<Protocols>,
}
impl Response {
fn check_message_operation(
&mut self,
operation: &str,
operate: impl FnOnce(&mut ResponseMessage) -> Result<(), MessageOperationError>,
) -> bool {
if self.message.is_none() || self.stream.is_none() {
tracing::warn!(
operation,
"response is already finalized, operation will not affect the response stream",
);
return false;
}
let message = self
.message
.as_mut()
.expect("response message is present after explicit check");
if let Err(error) = operate(message) {
let report = Report::from_error(&error);
tracing::warn!(
operation, error = %report,
"response message operation failed, operation will not affect the response stream",
);
return false;
}
true
}
pub fn headers(&self) -> &http::HeaderMap {
self.message
.as_ref()
.expect("response message is unavailable after finalization")
.header()
.header_map()
}
pub fn headers_mut(&mut self) -> &mut http::HeaderMap {
self.check_message_operation("modify_headers", |message| {
message.header_mut()?;
Ok(())
});
self.message
.as_mut()
.expect("response message is unavailable after finalization")
.header_mut_unchecked()
.header_map_mut()
}
pub fn set_header(&mut self, name: impl IntoHeaderName, value: HeaderValue) -> &mut Self {
self.check_message_operation("set_header", |message| {
message.header_mut()?.header_map_mut().insert(name, value);
Ok(())
});
self
}
pub fn status(&self) -> Option<http::StatusCode> {
self.message.as_ref().map(ResponseMessage::status)
}
pub fn set_status(&mut self, status: http::StatusCode) -> &mut Self {
self.check_message_operation("set_status", |message| {
message.header_mut()?.set_status(status);
Ok(())
});
self
}
pub fn set_body(&mut self, content: impl IntoBody) -> &mut Self {
self.check_message_operation("write_chunked_body", |message| {
if message.is_interim_response() {
return Err(MessageOperationError::BodyOrTrailerOnInterimResponse);
}
message.set_body(content)?;
Ok(())
});
self
}
pub fn write<B>(
&mut self,
content: B,
) -> impl Future<Output = Result<&mut Self, ResponseWriteError>> + use<'_, B>
where
B: IntoBody,
{
let content: Body = content.into_body();
async move {
let message = self
.message
.as_mut()
.ok_or(ResponseWriteError::ResponseFinalized)?;
if message.is_interim_response() {
return Err(
Err::<(), _>(MessageOperationError::BodyOrTrailerOnInterimResponse)
.context(response_write_error::MessageOperationSnafu)
.expect_err("response write message operation conversion must fail"),
);
}
let stream = self
.stream
.as_mut()
.ok_or(ResponseWriteError::ResponseFinalized)?;
message.write_streaming_body_to(stream, content).await?;
Ok(self)
}
}
pub async fn flush(&mut self) -> Result<&mut Self, MessageStreamError> {
let message = self
.message
.as_mut()
.ok_or(MessageStreamError::MessageSendFailed)?;
let stream = self
.stream
.as_mut()
.ok_or(MessageStreamError::MessageSendFailed)?;
message.write_all_to(stream).await?;
stream.flush().await?;
Ok(self)
}
pub fn trailers(&self) -> &HeaderMap {
self.message
.as_ref()
.expect("response message is unavailable after finalization")
.trailers()
}
pub fn trailers_mut(&mut self) -> &mut HeaderMap {
self.check_message_operation("modify_trailers", |message| {
if message.is_interim_response() {
return Err(MessageOperationError::BodyOrTrailerOnInterimResponse);
}
message.trailers_mut()?;
Ok(())
});
self.message
.as_mut()
.expect("response message is unavailable after finalization")
.trailers_mut_unchecked()
}
pub fn set_trailer(&mut self, name: impl IntoHeaderName, value: HeaderValue) -> &mut Self {
self.check_message_operation("set_trailer", |message| {
if message.is_interim_response() {
return Err(MessageOperationError::BodyOrTrailerOnInterimResponse);
}
message.trailers_mut()?.insert(name, value);
Ok(())
});
self
}
pub fn set_trailers(&mut self, map: HeaderMap) -> &mut Self {
self.check_message_operation("set_trailers", |message| {
if message.is_interim_response() {
return Err(MessageOperationError::BodyOrTrailerOnInterimResponse);
}
*message.trailers_mut()? = map;
Ok(())
});
self
}
pub async fn close(&mut self) -> Result<(), MessageStreamError> {
if let Some(future) = self.finish() {
future.await
} else {
Ok(())
}
}
pub async fn reset(&mut self, code: Code) -> Result<(), MessageStreamError> {
self.message = None;
if let Some(mut stream) = self.stream.take() {
stream.reset(code).await
} else {
Ok(())
}
}
pub fn authority(&self) -> &Arc<dyn authority::LocalAuthority> {
&self.authority
}
pub fn stream_id(&self) -> StreamId {
self.stream_id
}
pub fn protocols(&self) -> &Arc<Protocols> {
&self.protocols
}
pub fn finish(
&mut self,
) -> Option<impl Future<Output = Result<(), MessageStreamError>> + Send + use<>> {
let mut message = self.message.take()?;
let mut stream = self
.stream
.take()
.expect("response stream is unavailable while message is unfinished");
Some(async move {
if message.is_interim_response() {
let error = MessageOperationError::FinalResponseRequired;
let report = Report::from_error(&error);
tracing::warn!(
error = %report,
"response stream cannot be closed without a final response",
);
_ = stream.reset(Code::H3_MESSAGE_ERROR).await;
return Err(MessageStreamError::MessageSendFailed);
}
message.write_all_to(&mut stream).await?;
stream.close().await
})
}
pub(crate) fn drop(
&mut self,
) -> Option<impl Future<Output = Result<(), MessageStreamError>> + Send + use<>> {
self.finish()
}
}
impl Drop for Response {
fn drop(&mut self) {
if let Some(future) = self.finish() {
tokio::spawn(
async move {
if let Err(error) = future.await {
let report = Report::from_error(&error);
tracing::debug!(error = %report, "failed to finish response on drop");
}
}
.in_current_span(),
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn authority_accessors_have_directional_identity_names() {
let _request_authority = |request: &Request| {
let _: Option<&Arc<dyn authority::RemoteAuthority>> = request.authority();
};
let _response_authority = |response: &Response| {
let _: &Arc<dyn authority::LocalAuthority> = response.authority();
};
}
}