1use crate::body::{BoxBody, HttpBody};
2use crate::generic::{DecodeBuf, EncodeBuf};
3
4use bytes::BufMut;
5use futures::{Poll, Stream};
6use prost::DecodeError;
7use prost::Message;
8
9use std::fmt;
10use std::marker::PhantomData;
11
12#[derive(Debug)]
14pub struct Codec<T, U>(PhantomData<(T, U)>);
15
16#[derive(Debug)]
17pub struct Encoder<T>(PhantomData<T>);
18
19#[derive(Debug)]
20pub struct Decoder<T>(PhantomData<T>);
21
22pub type Streaming<T, B = BoxBody> = crate::generic::Streaming<Decoder<T>, B>;
24
25pub(crate) use crate::generic::Direction;
26
27pub struct Encode<T>
29where
30 T: Stream,
31{
32 inner: crate::generic::Encode<Encoder<T::Item>, T>,
33}
34
35impl<T, U> Codec<T, U>
38where
39 T: Message,
40 U: Message + Default,
41{
42 pub fn new() -> Self {
44 Codec(PhantomData)
45 }
46}
47
48impl<T, U> crate::generic::Codec for Codec<T, U>
49where
50 T: Message,
51 U: Message + Default,
52{
53 type Encode = T;
54 type Encoder = Encoder<T>;
55 type Decode = U;
56 type Decoder = Decoder<U>;
57
58 fn encoder(&mut self) -> Self::Encoder {
59 Encoder(PhantomData)
60 }
61
62 fn decoder(&mut self) -> Self::Decoder {
63 Decoder(PhantomData)
64 }
65}
66
67impl<T, U> Clone for Codec<T, U> {
68 fn clone(&self) -> Self {
69 Codec(PhantomData)
70 }
71}
72
73impl<T> Encoder<T>
76where
77 T: Message,
78{
79 pub fn new() -> Self {
80 Encoder(PhantomData)
81 }
82}
83
84impl<T> crate::generic::Encoder for Encoder<T>
85where
86 T: Message,
87{
88 type Item = T;
89
90 const CONTENT_TYPE: &'static str = "application/grpc+proto";
92
93 fn encode(&mut self, item: T, buf: &mut EncodeBuf<'_>) -> Result<(), crate::Status> {
94 let len = item.encoded_len();
95
96 if buf.remaining_mut() < len {
97 buf.reserve(len);
98 }
99
100 item.encode(buf)
101 .map_err(|_| unreachable!("Message only errors if not enough space"))
102 }
103}
104
105impl<T> Clone for Encoder<T> {
106 fn clone(&self) -> Self {
107 Encoder(PhantomData)
108 }
109}
110
111impl<T> Decoder<T>
114where
115 T: Message + Default,
116{
117 pub fn new() -> Self {
119 Decoder(PhantomData)
120 }
121}
122
123fn from_decode_error(error: DecodeError) -> crate::Status {
124 crate::Status::new(crate::Code::Internal, error.to_string())
127}
128
129impl<T> crate::generic::Decoder for Decoder<T>
130where
131 T: Message + Default,
132{
133 type Item = T;
134
135 fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<T, crate::Status> {
136 Message::decode(buf).map_err(from_decode_error)
137 }
138}
139
140impl<T> Clone for Decoder<T> {
141 fn clone(&self) -> Self {
142 Decoder(PhantomData)
143 }
144}
145
146impl<T> Encode<T>
149where
150 T: Stream<Error = crate::Status>,
151 T::Item: ::prost::Message,
152{
153 pub(crate) fn new(inner: crate::generic::Encode<Encoder<T::Item>, T>) -> Self {
154 Encode { inner }
155 }
156}
157
158impl<T> HttpBody for Encode<T>
159where
160 T: Stream<Error = crate::Status>,
161 T::Item: ::prost::Message,
162{
163 type Data = <crate::generic::Encode<Encoder<T::Item>, T> as HttpBody>::Data;
164 type Error = <crate::generic::Encode<Encoder<T::Item>, T> as HttpBody>::Error;
165
166 fn is_end_stream(&self) -> bool {
167 self.inner.is_end_stream()
168 }
169
170 fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
171 self.inner.poll_data()
172 }
173
174 fn poll_trailers(&mut self) -> Poll<Option<http::HeaderMap>, Self::Error> {
175 self.inner.poll_trailers()
176 }
177}
178
179impl<T> fmt::Debug for Encode<T>
180where
181 T: Stream + fmt::Debug,
182 T::Item: fmt::Debug,
183 T::Error: fmt::Debug,
184{
185 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
186 fmt.debug_struct("Encode")
187 .field("inner", &self.inner)
188 .finish()
189 }
190}