opensrv_clickhouse/binary/
read_ex.rs1use std::io;
16use std::mem::MaybeUninit;
17
18use micromarshal::Unmarshal;
19
20use crate::errors::DriverError;
21use crate::errors::Error;
22use crate::errors::Result;
23use crate::types::column::StringPool;
24use crate::types::StatBuffer;
25
26pub trait ReadEx {
27    fn read_bytes(&mut self, rv: &mut [u8]) -> Result<()>;
28    fn read_scalar<V>(&mut self) -> Result<V>
29    where
30        V: Copy + Unmarshal<V> + StatBuffer;
31    fn read_string(&mut self) -> Result<String>;
32    fn read_len_encode_bytes(&mut self) -> Result<Vec<u8>>;
33    fn skip_string(&mut self) -> Result<()>;
34    fn read_uvarint(&mut self) -> Result<u64>;
35    fn read_str_into_buffer(&mut self, pool: &mut StringPool) -> Result<()>;
36}
37
38const MAX_STACK_BUFFER_LEN: usize = 1024;
39
40impl<T> ReadEx for T
41where
42    T: io::Read,
43{
44    fn read_bytes(&mut self, rv: &mut [u8]) -> Result<()> {
45        let mut i = 0;
46        while i < rv.len() {
47            let res_nread = {
48                let buf = &mut rv[i..];
49                self.read(buf)
50            };
51            match res_nread {
52                Ok(0) => {
53                    let ret = io::Error::new(io::ErrorKind::WouldBlock, "would block");
54                    return Err(ret.into());
55                }
56                Ok(nread) => i += nread,
57                Err(e) => return Err(From::from(e)),
58            }
59        }
60        Ok(())
61    }
62
63    fn read_scalar<V>(&mut self) -> Result<V>
64    where
65        V: Copy + Unmarshal<V> + StatBuffer,
66    {
67        let mut buffer = V::buffer();
68        self.read_bytes(buffer.as_mut())?;
69        Ok(V::unmarshal(buffer.as_ref()))
70    }
71
72    fn read_string(&mut self) -> Result<String> {
73        let str_len = self.read_uvarint()? as usize;
74        let mut buffer = vec![0_u8; str_len];
75        self.read_bytes(buffer.as_mut())?;
76        Ok(String::from_utf8(buffer)?)
77    }
78
79    fn read_len_encode_bytes(&mut self) -> Result<Vec<u8>> {
80        let str_len = self.read_uvarint()? as usize;
81        let mut buffer = vec![0_u8; str_len];
82        self.read_bytes(buffer.as_mut())?;
83
84        Ok(buffer)
85    }
86
87    fn skip_string(&mut self) -> Result<()> {
88        let str_len = self.read_uvarint()? as usize;
89
90        if str_len <= MAX_STACK_BUFFER_LEN {
91            unsafe {
92                let mut buffer: [MaybeUninit<u8>; MAX_STACK_BUFFER_LEN] =
93                    MaybeUninit::uninit().assume_init();
94                self.read_bytes(
95                    &mut *(&mut buffer[..str_len] as *mut [MaybeUninit<u8>] as *mut [u8]),
96                )?;
97            }
98        } else {
99            let mut buffer = vec![0_u8; str_len];
100            self.read_bytes(buffer.as_mut())?;
101        }
102
103        Ok(())
104    }
105
106    fn read_uvarint(&mut self) -> Result<u64> {
107        let mut x = 0_u64;
108        let mut s = 0_u32;
109        let mut i = 0_usize;
110        loop {
111            let b: u8 = self.read_scalar()?;
112
113            if b < 0x80 {
114                if i == 9 && b > 1 {
115                    return Err(Error::Driver(DriverError::Overflow));
116                }
117                return Ok(x | (u64::from(b) << s));
118            }
119
120            x |= u64::from(b & 0x7f) << s;
121            s += 7;
122
123            i += 1;
124        }
125    }
126
127    fn read_str_into_buffer(&mut self, pool: &mut StringPool) -> Result<()> {
128        let str_len = self.read_uvarint()? as usize;
129        let buffer = pool.allocate(str_len);
130        self.read_bytes(buffer)?;
131        Ok(())
132    }
133}