Skip to main content

statsig_rust/specs_response/
proto_stream_reader.rs

1use std::io::Read;
2
3use crate::{
4    networking::{ResponseData, ResponseDataStream},
5    StatsigErr,
6};
7use brotli::Decompressor;
8use bytes::BytesMut;
9
10pub const BUFFER_SIZE: usize = 4096;
11
12pub struct ProtoStreamReader<'a> {
13    brotli_decompressor: Decompressor<StreamBorrower<'a>>,
14
15    scratch: [u8; BUFFER_SIZE],
16    buf: BytesMut,
17}
18
19impl<'a> ProtoStreamReader<'a> {
20    pub fn new(data: &'a mut ResponseData) -> Self {
21        let stream_borrower = StreamBorrower::new(data);
22        let brotli_decompressor = Decompressor::new(stream_borrower, BUFFER_SIZE);
23
24        Self {
25            brotli_decompressor,
26            scratch: [0u8; BUFFER_SIZE],
27            buf: BytesMut::new(),
28        }
29    }
30
31    pub fn read_next_delimited_proto(&mut self) -> Result<BytesMut, StatsigErr> {
32        let required_len = self.read_length_delimiter()?;
33
34        while self.buf.len() < required_len {
35            match self.brotli_decompressor.read(&mut self.scratch) {
36                Ok(0) => {
37                    return Ok(self.buf.split_to(required_len));
38                }
39                Ok(n) => {
40                    self.buf.extend_from_slice(&self.scratch[..n]);
41                }
42                Err(e) => {
43                    return Err(new_parse_err(e));
44                }
45            }
46        }
47
48        Ok(self.buf.split_to(required_len))
49    }
50
51    fn read_length_delimiter(&mut self) -> Result<usize, StatsigErr> {
52        let len_buf = &mut [0u8; 10];
53
54        let read_len = self
55            .brotli_decompressor
56            .read(len_buf)
57            .map_err(new_parse_err)?;
58
59        if read_len > 0 {
60            self.buf.extend_from_slice(&len_buf[..read_len]);
61        }
62
63        let data_len = prost::decode_length_delimiter(self.buf.as_ref()).map_err(new_parse_err)?;
64        let required_len = prost::length_delimiter_len(data_len) + data_len;
65
66        Ok(required_len)
67    }
68}
69
70fn new_parse_err<E>(err_string: E) -> StatsigErr
71where
72    E: std::fmt::Display,
73{
74    StatsigErr::ProtobufParseError("BrotliDecompressor".to_string(), err_string.to_string())
75}
76
77struct StreamBorrower<'a> {
78    stream: &'a mut dyn ResponseDataStream,
79}
80
81impl<'a> StreamBorrower<'a> {
82    pub fn new(data: &'a mut ResponseData) -> Self {
83        Self {
84            stream: data.get_stream_mut(),
85        }
86    }
87}
88
89impl<'a> std::io::Read for StreamBorrower<'a> {
90    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
91        self.stream.read(buf)
92    }
93}