use crate::metadata::MetadataMap;
#[cfg(feature = "transport")]
use crate::transport::Certificate;
use futures_core::Stream;
use http::Extensions;
use std::net::SocketAddr;
#[cfg(feature = "transport")]
use std::sync::Arc;
#[derive(Debug)]
pub struct Request<T> {
metadata: MetadataMap,
message: T,
extensions: Extensions,
}
#[derive(Clone)]
pub(crate) struct ConnectionInfo {
pub(crate) remote_addr: Option<SocketAddr>,
#[cfg(feature = "transport")]
pub(crate) peer_certs: Option<Arc<Vec<Certificate>>>,
}
pub trait IntoRequest<T>: sealed::Sealed {
fn into_request(self) -> Request<T>;
}
pub trait IntoStreamingRequest: sealed::Sealed {
type Stream: Stream<Item = Self::Message> + Send + Sync + 'static;
type Message;
fn into_streaming_request(self) -> Request<Self::Stream>;
}
impl<T> Request<T> {
pub fn new(message: T) -> Self {
Request {
metadata: MetadataMap::new(),
message,
extensions: Extensions::default(),
}
}
pub fn get_ref(&self) -> &T {
&self.message
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.message
}
pub fn metadata(&self) -> &MetadataMap {
&self.metadata
}
pub fn metadata_mut(&mut self) -> &mut MetadataMap {
&mut self.metadata
}
pub fn into_inner(self) -> T {
self.message
}
pub(crate) fn into_parts(self) -> (MetadataMap, Extensions, T) {
(self.metadata, self.extensions, self.message)
}
pub(crate) fn from_parts(metadata: MetadataMap, extensions: Extensions, message: T) -> Self {
Self {
metadata,
extensions,
message,
}
}
pub(crate) fn from_http_parts(parts: http::request::Parts, message: T) -> Self {
Request {
metadata: MetadataMap::from_headers(parts.headers),
message,
extensions: parts.extensions,
}
}
pub fn from_http(http: http::Request<T>) -> Self {
let (parts, message) = http.into_parts();
Request::from_http_parts(parts, message)
}
pub(crate) fn into_http(self, uri: http::Uri) -> http::Request<T> {
let mut request = http::Request::new(self.message);
*request.version_mut() = http::Version::HTTP_2;
*request.method_mut() = http::Method::POST;
*request.uri_mut() = uri;
*request.headers_mut() = self.metadata.into_sanitized_headers();
*request.extensions_mut() = self.extensions;
request
}
#[doc(hidden)]
pub fn map<F, U>(self, f: F) -> Request<U>
where
F: FnOnce(T) -> U,
{
let message = f(self.message);
Request {
metadata: self.metadata,
message,
extensions: Extensions::default(),
}
}
pub fn remote_addr(&self) -> Option<SocketAddr> {
self.get::<ConnectionInfo>()?.remote_addr
}
#[cfg(feature = "transport")]
#[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
pub fn peer_certs(&self) -> Option<Arc<Vec<Certificate>>> {
self.get::<ConnectionInfo>()?.peer_certs.clone()
}
pub(crate) fn get<I: Send + Sync + 'static>(&self) -> Option<&I> {
self.extensions.get::<I>()
}
}
impl<T> IntoRequest<T> for T {
fn into_request(self) -> Request<Self> {
Request::new(self)
}
}
impl<T> IntoRequest<T> for Request<T> {
fn into_request(self) -> Request<T> {
self
}
}
impl<T> IntoStreamingRequest for T
where
T: Stream + Send + Sync + 'static,
{
type Stream = T;
type Message = T::Item;
fn into_streaming_request(self) -> Request<Self> {
Request::new(self)
}
}
impl<T> IntoStreamingRequest for Request<T>
where
T: Stream + Send + Sync + 'static,
{
type Stream = T;
type Message = T::Item;
fn into_streaming_request(self) -> Self {
self
}
}
impl<T> sealed::Sealed for T {}
mod sealed {
pub trait Sealed {}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::MetadataValue;
use http::Uri;
#[test]
fn reserved_headers_are_excluded() {
let mut r = Request::new(1);
for header in &MetadataMap::GRPC_RESERVED_HEADERS {
r.metadata_mut()
.insert(*header, MetadataValue::from_static("invalid"));
}
let http_request = r.into_http(Uri::default());
assert!(http_request.headers().is_empty());
}
}