summavy_sstable/
delta.rs

1use std::io::{self, BufWriter, Write};
2use std::ops::Range;
3
4use common::CountingWriter;
5
6use super::value::ValueWriter;
7use super::{value, vint, BlockReader};
8
9const FOUR_BIT_LIMITS: usize = 1 << 4;
10const VINT_MODE: u8 = 1u8;
11const BLOCK_LEN: usize = 32_000;
12
13pub struct DeltaWriter<W, TValueWriter>
14where W: io::Write
15{
16    block: Vec<u8>,
17    write: CountingWriter<BufWriter<W>>,
18    value_writer: TValueWriter,
19    // Only here to avoid allocations.
20    stateless_buffer: Vec<u8>,
21}
22
23impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
24where
25    W: io::Write,
26    TValueWriter: ValueWriter,
27{
28    pub fn new(wrt: W) -> Self {
29        DeltaWriter {
30            block: Vec::with_capacity(BLOCK_LEN * 2),
31            write: CountingWriter::wrap(BufWriter::new(wrt)),
32            value_writer: TValueWriter::default(),
33            stateless_buffer: Vec::new(),
34        }
35    }
36}
37
38impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
39where
40    W: io::Write,
41    TValueWriter: value::ValueWriter,
42{
43    pub fn flush_block(&mut self) -> io::Result<Option<Range<usize>>> {
44        if self.block.is_empty() {
45            return Ok(None);
46        }
47        let start_offset = self.write.written_bytes() as usize;
48        let buffer: &mut Vec<u8> = &mut self.stateless_buffer;
49        self.value_writer.serialize_block(buffer);
50        self.value_writer.clear();
51        let block_len = buffer.len() + self.block.len();
52        self.write.write_all(&(block_len as u32).to_le_bytes())?;
53        self.write.write_all(&buffer[..])?;
54        self.write.write_all(&self.block[..])?;
55        let end_offset = self.write.written_bytes() as usize;
56        self.block.clear();
57        buffer.clear();
58        Ok(Some(start_offset..end_offset))
59    }
60
61    fn encode_keep_add(&mut self, keep_len: usize, add_len: usize) {
62        if keep_len < FOUR_BIT_LIMITS && add_len < FOUR_BIT_LIMITS {
63            let b = (keep_len | add_len << 4) as u8;
64            self.block.extend_from_slice(&[b])
65        } else {
66            let mut buf = [VINT_MODE; 20];
67            let mut len = 1 + vint::serialize(keep_len as u64, &mut buf[1..]);
68            len += vint::serialize(add_len as u64, &mut buf[len..]);
69            self.block.extend_from_slice(&buf[..len])
70        }
71    }
72
73    pub(crate) fn write_suffix(&mut self, common_prefix_len: usize, suffix: &[u8]) {
74        let keep_len = common_prefix_len;
75        let add_len = suffix.len();
76        self.encode_keep_add(keep_len, add_len);
77        self.block.extend_from_slice(suffix);
78    }
79
80    pub(crate) fn write_value(&mut self, value: &TValueWriter::Value) {
81        self.value_writer.write(value);
82    }
83
84    pub fn flush_block_if_required(&mut self) -> io::Result<Option<Range<usize>>> {
85        if self.block.len() > BLOCK_LEN {
86            return self.flush_block();
87        }
88        Ok(None)
89    }
90
91    pub fn finish(self) -> CountingWriter<BufWriter<W>> {
92        self.write
93    }
94}
95
96pub struct DeltaReader<'a, TValueReader> {
97    common_prefix_len: usize,
98    suffix_range: Range<usize>,
99    value_reader: TValueReader,
100    block_reader: BlockReader<'a>,
101    idx: usize,
102}
103
104impl<'a, TValueReader> DeltaReader<'a, TValueReader>
105where TValueReader: value::ValueReader
106{
107    pub fn new<R: io::Read + 'a>(reader: R) -> Self {
108        DeltaReader {
109            idx: 0,
110            common_prefix_len: 0,
111            suffix_range: 0..0,
112            value_reader: TValueReader::default(),
113            block_reader: BlockReader::new(Box::new(reader)),
114        }
115    }
116
117    pub fn empty() -> Self {
118        DeltaReader::new(&b""[..])
119    }
120
121    fn deserialize_vint(&mut self) -> u64 {
122        self.block_reader.deserialize_u64()
123    }
124
125    fn read_keep_add(&mut self) -> Option<(usize, usize)> {
126        let b = {
127            let buf = &self.block_reader.buffer();
128            if buf.is_empty() {
129                return None;
130            }
131            buf[0]
132        };
133        self.block_reader.advance(1);
134        match b {
135            VINT_MODE => {
136                let keep = self.deserialize_vint() as usize;
137                let add = self.deserialize_vint() as usize;
138                Some((keep, add))
139            }
140            b => {
141                let keep = (b & 0b1111) as usize;
142                let add = (b >> 4) as usize;
143                Some((keep, add))
144            }
145        }
146    }
147
148    fn read_delta_key(&mut self) -> bool {
149        let Some((keep, add)) = self.read_keep_add() else {
150            return false;
151        };
152        self.common_prefix_len = keep;
153        let suffix_start = self.block_reader.offset();
154        self.suffix_range = suffix_start..(suffix_start + add);
155        self.block_reader.advance(add);
156        true
157    }
158
159    pub fn advance(&mut self) -> io::Result<bool> {
160        if self.block_reader.buffer().is_empty() {
161            if !self.block_reader.read_block()? {
162                return Ok(false);
163            }
164            let consumed_len = self.value_reader.load(self.block_reader.buffer())?;
165            self.block_reader.advance(consumed_len);
166            self.idx = 0;
167        } else {
168            self.idx += 1;
169        }
170        if !self.read_delta_key() {
171            return Ok(false);
172        }
173        Ok(true)
174    }
175
176    #[inline(always)]
177    pub fn common_prefix_len(&self) -> usize {
178        self.common_prefix_len
179    }
180
181    #[inline(always)]
182    pub fn suffix(&self) -> &[u8] {
183        self.block_reader.buffer_from_to(self.suffix_range.clone())
184    }
185
186    #[inline(always)]
187    pub fn value(&self) -> &TValueReader::Value {
188        self.value_reader.value(self.idx)
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::DeltaReader;
195    use crate::value::U64MonotonicValueReader;
196
197    #[test]
198    fn test_empty() {
199        let mut delta_reader: DeltaReader<U64MonotonicValueReader> = DeltaReader::empty();
200        assert!(!delta_reader.advance().unwrap());
201    }
202}