statsig_rust/specs_response/
proto_stream_reader.rs1use std::io::Read;
2
3use crate::{
4 networking::{ResponseData, ResponseDataStream},
5 StatsigErr,
6};
7use brotli::Decompressor;
8use bytes::BytesMut;
9
10pub const BUFFER_SIZE: usize = 4096;
11
12pub struct ProtoStreamReader<'a> {
13 brotli_decompressor: Decompressor<StreamBorrower<'a>>,
14
15 scratch: [u8; BUFFER_SIZE],
16 buf: BytesMut,
17}
18
19impl<'a> ProtoStreamReader<'a> {
20 pub fn new(data: &'a mut ResponseData) -> Self {
21 let stream_borrower = StreamBorrower::new(data);
22 let brotli_decompressor = Decompressor::new(stream_borrower, BUFFER_SIZE);
23
24 Self {
25 brotli_decompressor,
26 scratch: [0u8; BUFFER_SIZE],
27 buf: BytesMut::new(),
28 }
29 }
30
31 pub fn read_next_delimited_proto(&mut self) -> Result<BytesMut, StatsigErr> {
32 let required_len = self.read_length_delimiter()?;
33
34 while self.buf.len() < required_len {
35 match self.brotli_decompressor.read(&mut self.scratch) {
36 Ok(0) => {
37 return Ok(self.buf.split_to(required_len));
38 }
39 Ok(n) => {
40 self.buf.extend_from_slice(&self.scratch[..n]);
41 }
42 Err(e) => {
43 return Err(new_parse_err(e));
44 }
45 }
46 }
47
48 Ok(self.buf.split_to(required_len))
49 }
50
51 fn read_length_delimiter(&mut self) -> Result<usize, StatsigErr> {
52 let len_buf = &mut [0u8; 10];
53
54 let read_len = self
55 .brotli_decompressor
56 .read(len_buf)
57 .map_err(new_parse_err)?;
58
59 if read_len > 0 {
60 self.buf.extend_from_slice(&len_buf[..read_len]);
61 }
62
63 let data_len = prost::decode_length_delimiter(self.buf.as_ref()).map_err(new_parse_err)?;
64 let required_len = prost::length_delimiter_len(data_len) + data_len;
65
66 Ok(required_len)
67 }
68}
69
70fn new_parse_err<E>(err_string: E) -> StatsigErr
71where
72 E: std::fmt::Display,
73{
74 StatsigErr::ProtobufParseError("BrotliDecompressor".to_string(), err_string.to_string())
75}
76
77struct StreamBorrower<'a> {
78 stream: &'a mut dyn ResponseDataStream,
79}
80
81impl<'a> StreamBorrower<'a> {
82 pub fn new(data: &'a mut ResponseData) -> Self {
83 Self {
84 stream: data.get_stream_mut(),
85 }
86 }
87}
88
89impl<'a> std::io::Read for StreamBorrower<'a> {
90 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
91 self.stream.read(buf)
92 }
93}