tower_grpc/
body.rs

1use self::sealed::Sealed;
2use crate::error::Error;
3use crate::Status;
4
5use bytes::{Buf, Bytes, IntoBuf};
6use futures::{try_ready, Poll};
7pub use http_body::Body as HttpBody;
8use std::fmt;
9
10type BytesBuf = <Bytes as IntoBuf>::Buf;
11
12/// A "trait alias" for `tower_http_service::Body` with bounds required by
13/// tower-grpc.
14///
15/// Not to be implemented directly, but instead useful for reducing bounds
16/// boilerplate.
17pub trait Body: Sealed {
18    type Data: Buf;
19    type Error: Into<Error>;
20
21    fn is_end_stream(&self) -> bool;
22
23    fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error>;
24
25    fn poll_trailers(&mut self) -> Poll<Option<http::HeaderMap>, Self::Error>;
26}
27
28impl<T> Body for T
29where
30    T: HttpBody,
31    T::Error: Into<Error>,
32{
33    type Data = T::Data;
34    type Error = T::Error;
35
36    fn is_end_stream(&self) -> bool {
37        HttpBody::is_end_stream(self)
38    }
39
40    fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
41        HttpBody::poll_data(self)
42    }
43
44    fn poll_trailers(&mut self) -> Poll<Option<http::HeaderMap>, Self::Error> {
45        HttpBody::poll_trailers(self)
46    }
47}
48
49impl<T> Sealed for T
50where
51    T: HttpBody,
52    T::Error: Into<Error>,
53{
54}
55
56/// Dynamic `Send` body object.
57pub struct BoxBody {
58    inner: Box<dyn Body<Data = BytesBuf, Error = Status> + Send>,
59}
60
61struct MapBody<B>(B);
62
63// ===== impl BoxBody =====
64
65impl BoxBody {
66    /// Create a new `BoxBody` backed by `inner`.
67    pub fn new(inner: Box<dyn Body<Data = BytesBuf, Error = Status> + Send>) -> Self {
68        BoxBody { inner }
69    }
70
71    /// Create a new `BoxBody` mapping item and error to the default types.
72    pub fn map_from<B>(inner: B) -> Self
73    where
74        B: Body + Send + 'static,
75        B::Data: Into<Bytes>,
76    {
77        BoxBody::new(Box::new(MapBody(inner)))
78    }
79}
80
81impl HttpBody for BoxBody {
82    type Data = BytesBuf;
83    type Error = Status;
84
85    fn is_end_stream(&self) -> bool {
86        self.inner.is_end_stream()
87    }
88
89    fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
90        self.inner.poll_data()
91    }
92
93    fn poll_trailers(&mut self) -> Poll<Option<http::HeaderMap>, Self::Error> {
94        self.inner.poll_trailers()
95    }
96}
97
98impl fmt::Debug for BoxBody {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        f.debug_struct("BoxBody").finish()
101    }
102}
103
104// ===== impl MapBody =====
105
106impl<B> HttpBody for MapBody<B>
107where
108    B: Body,
109    B::Data: Into<Bytes>,
110{
111    type Data = BytesBuf;
112    type Error = Status;
113
114    fn is_end_stream(&self) -> bool {
115        self.0.is_end_stream()
116    }
117
118    fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
119        let item = try_ready!(self.0.poll_data().map_err(Status::map_error));
120        Ok(item.map(|buf| buf.into().into_buf()).into())
121    }
122
123    fn poll_trailers(&mut self) -> Poll<Option<http::HeaderMap>, Self::Error> {
124        self.0.poll_trailers().map_err(Status::map_error)
125    }
126}
127
128mod sealed {
129    pub trait Sealed {}
130}