tower_grpc/
codec.rs

1use crate::body::{BoxBody, HttpBody};
2use crate::generic::{DecodeBuf, EncodeBuf};
3
4use bytes::BufMut;
5use futures::{Poll, Stream};
6use prost::DecodeError;
7use prost::Message;
8
9use std::fmt;
10use std::marker::PhantomData;
11
12/// Protobuf codec
13#[derive(Debug)]
14pub struct Codec<T, U>(PhantomData<(T, U)>);
15
16#[derive(Debug)]
17pub struct Encoder<T>(PhantomData<T>);
18
19#[derive(Debug)]
20pub struct Decoder<T>(PhantomData<T>);
21
22/// A stream of inbound gRPC messages
23pub type Streaming<T, B = BoxBody> = crate::generic::Streaming<Decoder<T>, B>;
24
25pub(crate) use crate::generic::Direction;
26
27/// A protobuf encoded gRPC response body
28pub struct Encode<T>
29where
30    T: Stream,
31{
32    inner: crate::generic::Encode<Encoder<T::Item>, T>,
33}
34
35// ===== impl Codec =====
36
37impl<T, U> Codec<T, U>
38where
39    T: Message,
40    U: Message + Default,
41{
42    /// Create a new protobuf codec
43    pub fn new() -> Self {
44        Codec(PhantomData)
45    }
46}
47
48impl<T, U> crate::generic::Codec for Codec<T, U>
49where
50    T: Message,
51    U: Message + Default,
52{
53    type Encode = T;
54    type Encoder = Encoder<T>;
55    type Decode = U;
56    type Decoder = Decoder<U>;
57
58    fn encoder(&mut self) -> Self::Encoder {
59        Encoder(PhantomData)
60    }
61
62    fn decoder(&mut self) -> Self::Decoder {
63        Decoder(PhantomData)
64    }
65}
66
67impl<T, U> Clone for Codec<T, U> {
68    fn clone(&self) -> Self {
69        Codec(PhantomData)
70    }
71}
72
73// ===== impl Encoder =====
74
75impl<T> Encoder<T>
76where
77    T: Message,
78{
79    pub fn new() -> Self {
80        Encoder(PhantomData)
81    }
82}
83
84impl<T> crate::generic::Encoder for Encoder<T>
85where
86    T: Message,
87{
88    type Item = T;
89
90    /// Protocol buffer gRPC content type
91    const CONTENT_TYPE: &'static str = "application/grpc+proto";
92
93    fn encode(&mut self, item: T, buf: &mut EncodeBuf<'_>) -> Result<(), crate::Status> {
94        let len = item.encoded_len();
95
96        if buf.remaining_mut() < len {
97            buf.reserve(len);
98        }
99
100        item.encode(buf)
101            .map_err(|_| unreachable!("Message only errors if not enough space"))
102    }
103}
104
105impl<T> Clone for Encoder<T> {
106    fn clone(&self) -> Self {
107        Encoder(PhantomData)
108    }
109}
110
111// ===== impl Decoder =====
112
113impl<T> Decoder<T>
114where
115    T: Message + Default,
116{
117    /// Returns a new decoder
118    pub fn new() -> Self {
119        Decoder(PhantomData)
120    }
121}
122
123fn from_decode_error(error: DecodeError) -> crate::Status {
124    // Map Protobuf parse errors to an INTERNAL status code, as per
125    // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
126    crate::Status::new(crate::Code::Internal, error.to_string())
127}
128
129impl<T> crate::generic::Decoder for Decoder<T>
130where
131    T: Message + Default,
132{
133    type Item = T;
134
135    fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<T, crate::Status> {
136        Message::decode(buf).map_err(from_decode_error)
137    }
138}
139
140impl<T> Clone for Decoder<T> {
141    fn clone(&self) -> Self {
142        Decoder(PhantomData)
143    }
144}
145
146// ===== impl Encode =====
147
148impl<T> Encode<T>
149where
150    T: Stream<Error = crate::Status>,
151    T::Item: ::prost::Message,
152{
153    pub(crate) fn new(inner: crate::generic::Encode<Encoder<T::Item>, T>) -> Self {
154        Encode { inner }
155    }
156}
157
158impl<T> HttpBody for Encode<T>
159where
160    T: Stream<Error = crate::Status>,
161    T::Item: ::prost::Message,
162{
163    type Data = <crate::generic::Encode<Encoder<T::Item>, T> as HttpBody>::Data;
164    type Error = <crate::generic::Encode<Encoder<T::Item>, T> as HttpBody>::Error;
165
166    fn is_end_stream(&self) -> bool {
167        self.inner.is_end_stream()
168    }
169
170    fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
171        self.inner.poll_data()
172    }
173
174    fn poll_trailers(&mut self) -> Poll<Option<http::HeaderMap>, Self::Error> {
175        self.inner.poll_trailers()
176    }
177}
178
179impl<T> fmt::Debug for Encode<T>
180where
181    T: Stream + fmt::Debug,
182    T::Item: fmt::Debug,
183    T::Error: fmt::Debug,
184{
185    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
186        fmt.debug_struct("Encode")
187            .field("inner", &self.inner)
188            .finish()
189    }
190}