apache_dubbo/triple/
encode.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::{pin::Pin, task::Poll};
18
19use crate::status::Status;
20use bytes::{BufMut, Bytes, BytesMut};
21use futures_core::{Stream, TryStream};
22use futures_util::{ready, StreamExt, TryStreamExt};
23use http_body::Body;
24use pin_project::pin_project;
25
26use super::compression::{compress, CompressionEncoding};
27use crate::triple::codec::{EncodeBuf, Encoder};
28
29#[allow(unused_must_use)]
30pub fn encode<E, B>(
31    mut encoder: E,
32    resp_body: B,
33    compression_encoding: Option<CompressionEncoding>,
34) -> impl TryStream<Ok = Bytes, Error = Status>
35where
36    E: Encoder<Error = Status>,
37    B: Stream<Item = Result<E::Item, Status>>,
38{
39    async_stream::stream! {
40        let mut buf = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
41        futures_util::pin_mut!(resp_body);
42
43        let (enable_compress, mut uncompression_buf) = match compression_encoding {
44            Some(CompressionEncoding::Gzip) => (true, BytesMut::with_capacity(super::consts::BUFFER_SIZE)),
45            None => (false, BytesMut::new())
46        };
47
48        loop {
49            match resp_body.next().await {
50                Some(Ok(item)) => {
51                    // 编码数据到缓冲中
52                    buf.reserve(super::consts::HEADER_SIZE);
53                    unsafe {
54                        buf.advance_mut(super::consts::HEADER_SIZE);
55                    }
56
57                    if enable_compress {
58                        uncompression_buf.clear();
59
60                        encoder.encode(item, &mut EncodeBuf::new(&mut uncompression_buf))
61                            .map_err(|_e| crate::status::Status::new(crate::status::Code::Internal, "encode error".to_string()));
62
63                        let len = uncompression_buf.len();
64                        compress(compression_encoding.unwrap(), &mut uncompression_buf, &mut buf, len)
65                            .map_err(|_| crate::status::Status::new(crate::status::Code::Internal, "compress error".to_string()));
66                    } else {
67                        encoder.encode(item, &mut EncodeBuf::new(&mut buf)).map_err(|_e| crate::status::Status::new(crate::status::Code::Internal, "encode error".to_string()));
68                    }
69
70
71                    let len = buf.len() - super::consts::HEADER_SIZE;
72                    {
73                        let mut buf = &mut buf[..super::consts::HEADER_SIZE];
74                        buf.put_u8(enable_compress as u8);
75                        buf.put_u32(len as u32);
76                    }
77
78                    yield Ok(buf.split_to(len + super::consts::HEADER_SIZE).freeze());
79                },
80                Some(Err(err)) => yield Err(err.into()),
81                None => break,
82            }
83        }
84    }
85}
86
87pub fn encode_server<E, B>(
88    encoder: E,
89    body: B,
90    compression_encoding: Option<CompressionEncoding>,
91) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
92where
93    E: Encoder<Error = Status>,
94    B: Stream<Item = Result<E::Item, Status>>,
95{
96    let s = encode(encoder, body, compression_encoding).into_stream();
97    EncodeBody::new_server(s)
98}
99
100pub fn encode_client<E, B>(
101    encoder: E,
102    body: B,
103    compression_encoding: Option<CompressionEncoding>,
104) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
105where
106    E: Encoder<Error = Status>,
107    B: Stream<Item = E::Item>,
108{
109    let s = encode(encoder, body.map(Ok), compression_encoding).into_stream();
110    EncodeBody::new_client(s)
111}
112
113#[derive(Debug)]
114enum Role {
115    Server,
116    Client,
117}
118#[pin_project]
119pub struct EncodeBody<S> {
120    #[pin]
121    inner: S,
122    role: Role,
123    is_end_stream: bool,
124    error: Option<crate::status::Status>,
125}
126
127impl<S> EncodeBody<S> {
128    pub fn new_server(inner: S) -> Self {
129        Self {
130            inner,
131            role: Role::Server,
132            is_end_stream: false,
133            error: None,
134        }
135    }
136
137    pub fn new_client(inner: S) -> Self {
138        Self {
139            inner,
140            role: Role::Client,
141            is_end_stream: false,
142            error: None,
143        }
144    }
145}
146
147impl<S> Body for EncodeBody<S>
148where
149    S: Stream<Item = Result<Bytes, crate::status::Status>>,
150{
151    type Data = Bytes;
152
153    type Error = crate::status::Status;
154
155    fn is_end_stream(&self) -> bool {
156        self.is_end_stream
157    }
158
159    fn poll_data(
160        self: Pin<&mut Self>,
161        cx: &mut std::task::Context<'_>,
162    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
163        let mut self_proj = self.project();
164        match ready!(self_proj.inner.try_poll_next_unpin(cx)) {
165            Some(Ok(d)) => Some(Ok(d)).into(),
166            Some(Err(status)) => {
167                *self_proj.error = Some(status);
168                None.into()
169            }
170            None => None.into(),
171        }
172    }
173
174    fn poll_trailers(
175        self: Pin<&mut Self>,
176        _cx: &mut std::task::Context<'_>,
177    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
178        let self_proj = self.project();
179        if *self_proj.is_end_stream {
180            return Poll::Ready(Ok(None));
181        }
182
183        let status = if let Some(status) = self_proj.error.take() {
184            *self_proj.is_end_stream = true;
185            status
186        } else {
187            crate::status::Status::new(
188                crate::status::Code::Ok,
189                "poll trailer successfully.".to_string(),
190            )
191        };
192        let http = status.to_http();
193
194        Poll::Ready(Ok(Some(http.headers().to_owned())))
195    }
196}