apache_dubbo/triple/codec/
buffer.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use bytes::buf::UninitSlice;
19use bytes::{Buf, BufMut, BytesMut};
20
21/// A specialized buffer to decode gRPC messages from.
22#[derive(Debug)]
23pub struct DecodeBuf<'a> {
24    buf: &'a mut BytesMut,
25    len: usize,
26}
27
28/// A specialized buffer to encode gRPC messages into.
29#[derive(Debug)]
30pub struct EncodeBuf<'a> {
31    buf: &'a mut BytesMut,
32}
33
34impl<'a> DecodeBuf<'a> {
35    pub fn new(buf: &'a mut BytesMut, len: usize) -> Self {
36        DecodeBuf { buf, len }
37    }
38}
39
40impl Buf for DecodeBuf<'_> {
41    #[inline]
42    fn remaining(&self) -> usize {
43        self.len
44    }
45
46    #[inline]
47    fn chunk(&self) -> &[u8] {
48        let ret = self.buf.chunk();
49
50        if ret.len() > self.len {
51            &ret[..self.len]
52        } else {
53            ret
54        }
55    }
56
57    #[inline]
58    fn advance(&mut self, cnt: usize) {
59        assert!(cnt <= self.len);
60        self.buf.advance(cnt);
61        self.len -= cnt;
62    }
63}
64
65impl<'a> EncodeBuf<'a> {
66    pub fn new(buf: &'a mut BytesMut) -> Self {
67        EncodeBuf { buf }
68    }
69}
70
71impl EncodeBuf<'_> {
72    /// Reserves capacity for at least `additional` more bytes to be inserted
73    /// into the buffer.
74    ///
75    /// More than `additional` bytes may be reserved in order to avoid frequent
76    /// reallocations. A call to `reserve` may result in an allocation.
77    #[inline]
78    pub fn reserve(&mut self, additional: usize) {
79        self.buf.reserve(additional);
80    }
81}
82
83unsafe impl BufMut for EncodeBuf<'_> {
84    #[inline]
85    fn remaining_mut(&self) -> usize {
86        self.buf.remaining_mut()
87    }
88
89    #[inline]
90    unsafe fn advance_mut(&mut self, cnt: usize) {
91        self.buf.advance_mut(cnt)
92    }
93
94    #[inline]
95    fn chunk_mut(&mut self) -> &mut UninitSlice {
96        self.buf.chunk_mut()
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    #[test]
105    fn decode_buf() {
106        let mut payload = BytesMut::with_capacity(100);
107        payload.put(&vec![0u8; 50][..]);
108        let mut buf = DecodeBuf::new(&mut payload, 20);
109
110        assert_eq!(buf.len, 20);
111        assert_eq!(buf.remaining(), 20);
112        assert_eq!(buf.chunk().len(), 20);
113
114        buf.advance(10);
115        assert_eq!(buf.remaining(), 10);
116
117        let mut out = [0; 5];
118        buf.copy_to_slice(&mut out);
119        assert_eq!(buf.remaining(), 5);
120        assert_eq!(buf.chunk().len(), 5);
121
122        assert_eq!(buf.copy_to_bytes(5).len(), 5);
123        assert!(!buf.has_remaining());
124    }
125
126    #[test]
127    fn encode_buf() {
128        let mut bytes = BytesMut::with_capacity(100);
129        let mut buf = EncodeBuf::new(&mut bytes);
130
131        let initial = buf.remaining_mut();
132        unsafe { buf.advance_mut(20) };
133        assert_eq!(buf.remaining_mut(), initial - 20);
134
135        buf.put_u8(b'a');
136        assert_eq!(buf.remaining_mut(), initial - 20 - 1);
137    }
138}