apache_dubbo/triple/
encode.rs1use std::{pin::Pin, task::Poll};
18
19use crate::status::Status;
20use bytes::{BufMut, Bytes, BytesMut};
21use futures_core::{Stream, TryStream};
22use futures_util::{ready, StreamExt, TryStreamExt};
23use http_body::Body;
24use pin_project::pin_project;
25
26use super::compression::{compress, CompressionEncoding};
27use crate::triple::codec::{EncodeBuf, Encoder};
28
29#[allow(unused_must_use)]
30pub fn encode<E, B>(
31 mut encoder: E,
32 resp_body: B,
33 compression_encoding: Option<CompressionEncoding>,
34) -> impl TryStream<Ok = Bytes, Error = Status>
35where
36 E: Encoder<Error = Status>,
37 B: Stream<Item = Result<E::Item, Status>>,
38{
39 async_stream::stream! {
40 let mut buf = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
41 futures_util::pin_mut!(resp_body);
42
43 let (enable_compress, mut uncompression_buf) = match compression_encoding {
44 Some(CompressionEncoding::Gzip) => (true, BytesMut::with_capacity(super::consts::BUFFER_SIZE)),
45 None => (false, BytesMut::new())
46 };
47
48 loop {
49 match resp_body.next().await {
50 Some(Ok(item)) => {
51 buf.reserve(super::consts::HEADER_SIZE);
53 unsafe {
54 buf.advance_mut(super::consts::HEADER_SIZE);
55 }
56
57 if enable_compress {
58 uncompression_buf.clear();
59
60 encoder.encode(item, &mut EncodeBuf::new(&mut uncompression_buf))
61 .map_err(|_e| crate::status::Status::new(crate::status::Code::Internal, "encode error".to_string()));
62
63 let len = uncompression_buf.len();
64 compress(compression_encoding.unwrap(), &mut uncompression_buf, &mut buf, len)
65 .map_err(|_| crate::status::Status::new(crate::status::Code::Internal, "compress error".to_string()));
66 } else {
67 encoder.encode(item, &mut EncodeBuf::new(&mut buf)).map_err(|_e| crate::status::Status::new(crate::status::Code::Internal, "encode error".to_string()));
68 }
69
70
71 let len = buf.len() - super::consts::HEADER_SIZE;
72 {
73 let mut buf = &mut buf[..super::consts::HEADER_SIZE];
74 buf.put_u8(enable_compress as u8);
75 buf.put_u32(len as u32);
76 }
77
78 yield Ok(buf.split_to(len + super::consts::HEADER_SIZE).freeze());
79 },
80 Some(Err(err)) => yield Err(err.into()),
81 None => break,
82 }
83 }
84 }
85}
86
87pub fn encode_server<E, B>(
88 encoder: E,
89 body: B,
90 compression_encoding: Option<CompressionEncoding>,
91) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
92where
93 E: Encoder<Error = Status>,
94 B: Stream<Item = Result<E::Item, Status>>,
95{
96 let s = encode(encoder, body, compression_encoding).into_stream();
97 EncodeBody::new_server(s)
98}
99
100pub fn encode_client<E, B>(
101 encoder: E,
102 body: B,
103 compression_encoding: Option<CompressionEncoding>,
104) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
105where
106 E: Encoder<Error = Status>,
107 B: Stream<Item = E::Item>,
108{
109 let s = encode(encoder, body.map(Ok), compression_encoding).into_stream();
110 EncodeBody::new_client(s)
111}
112
113#[derive(Debug)]
114enum Role {
115 Server,
116 Client,
117}
118#[pin_project]
119pub struct EncodeBody<S> {
120 #[pin]
121 inner: S,
122 role: Role,
123 is_end_stream: bool,
124 error: Option<crate::status::Status>,
125}
126
127impl<S> EncodeBody<S> {
128 pub fn new_server(inner: S) -> Self {
129 Self {
130 inner,
131 role: Role::Server,
132 is_end_stream: false,
133 error: None,
134 }
135 }
136
137 pub fn new_client(inner: S) -> Self {
138 Self {
139 inner,
140 role: Role::Client,
141 is_end_stream: false,
142 error: None,
143 }
144 }
145}
146
147impl<S> Body for EncodeBody<S>
148where
149 S: Stream<Item = Result<Bytes, crate::status::Status>>,
150{
151 type Data = Bytes;
152
153 type Error = crate::status::Status;
154
155 fn is_end_stream(&self) -> bool {
156 self.is_end_stream
157 }
158
159 fn poll_data(
160 self: Pin<&mut Self>,
161 cx: &mut std::task::Context<'_>,
162 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
163 let mut self_proj = self.project();
164 match ready!(self_proj.inner.try_poll_next_unpin(cx)) {
165 Some(Ok(d)) => Some(Ok(d)).into(),
166 Some(Err(status)) => {
167 *self_proj.error = Some(status);
168 None.into()
169 }
170 None => None.into(),
171 }
172 }
173
174 fn poll_trailers(
175 self: Pin<&mut Self>,
176 _cx: &mut std::task::Context<'_>,
177 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
178 let self_proj = self.project();
179 if *self_proj.is_end_stream {
180 return Poll::Ready(Ok(None));
181 }
182
183 let status = if let Some(status) = self_proj.error.take() {
184 *self_proj.is_end_stream = true;
185 status
186 } else {
187 crate::status::Status::new(
188 crate::status::Code::Ok,
189 "poll trailer successfully.".to_string(),
190 )
191 };
192 let http = status.to_http();
193
194 Poll::Ready(Ok(Some(http.headers().to_owned())))
195 }
196}