apache_dubbo/triple/
compression.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::HashMap;
19
20use bytes::{Buf, BufMut, BytesMut};
21use flate2::read::{GzDecoder, GzEncoder};
22use flate2::Compression;
23use lazy_static::lazy_static;
24
25pub const GRPC_ACCEPT_ENCODING: &str = "grpc-accept-encoding";
26pub const GRPC_ENCODING: &str = "grpc-encoding";
27
28#[derive(Debug, Clone, Copy)]
29pub enum CompressionEncoding {
30    Gzip,
31}
32
33lazy_static! {
34    pub static ref COMPRESSIONS: HashMap<String, Option<CompressionEncoding>> = {
35        let mut v = HashMap::new();
36        v.insert("gzip".to_string(), Some(CompressionEncoding::Gzip));
37        v
38    };
39}
40
41impl CompressionEncoding {
42    pub fn from_accept_encoding(header: &http::HeaderMap) -> Option<CompressionEncoding> {
43        let accept_encoding = header.get(GRPC_ACCEPT_ENCODING)?;
44        let encodings = accept_encoding.to_str().ok()?;
45
46        encodings
47            .trim()
48            .split(',')
49            .map(|s| s.trim())
50            .into_iter()
51            .find_map(|s| match s {
52                "gzip" => Some(CompressionEncoding::Gzip),
53                _ => None,
54            })
55    }
56
57    pub fn into_header_value(self) -> http::HeaderValue {
58        match self {
59            CompressionEncoding::Gzip => http::HeaderValue::from_static("gzip"),
60        }
61    }
62}
63
64pub fn compress(
65    encoding: CompressionEncoding,
66    src: &mut BytesMut,
67    dst: &mut BytesMut,
68    len: usize,
69) -> Result<(), std::io::Error> {
70    dst.reserve(len);
71
72    match encoding {
73        CompressionEncoding::Gzip => {
74            let mut en = GzEncoder::new(src.reader(), Compression::default());
75
76            let mut dst_writer = dst.writer();
77
78            std::io::copy(&mut en, &mut dst_writer)?;
79        }
80    }
81
82    Ok(())
83}
84
85pub fn decompress(
86    encoding: CompressionEncoding,
87    src: &mut BytesMut,
88    dst: &mut BytesMut,
89    len: usize,
90) -> Result<(), std::io::Error> {
91    let capacity = len * 2;
92    dst.reserve(capacity);
93
94    match encoding {
95        CompressionEncoding::Gzip => {
96            let mut de = GzDecoder::new(src.reader());
97
98            let mut dst_writer = dst.writer();
99
100            std::io::copy(&mut de, &mut dst_writer)?;
101        }
102    }
103    Ok(())
104}
105
106#[test]
107fn test_compress() {
108    let mut src = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
109    src.put(&b"test compress"[..]);
110    let mut dst = BytesMut::new();
111    let len = src.len();
112    src.reserve(len);
113
114    compress(CompressionEncoding::Gzip, &mut src, &mut dst, len);
115    println!("src: {:?}, dst: {:?}", src, dst);
116
117    let mut de_dst = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
118    let de_len = dst.len();
119    decompress(CompressionEncoding::Gzip, &mut dst, &mut de_dst, de_len);
120
121    println!("src: {:?}, dst: {:?}", dst, de_dst);
122}