1use std::io::{self, BufWriter, Write};
2use std::ops::Range;
3
4use common::CountingWriter;
5
6use super::value::ValueWriter;
7use super::{value, vint, BlockReader};
8
9const FOUR_BIT_LIMITS: usize = 1 << 4;
10const VINT_MODE: u8 = 1u8;
11const BLOCK_LEN: usize = 32_000;
12
13pub struct DeltaWriter<W, TValueWriter>
14where W: io::Write
15{
16 block: Vec<u8>,
17 write: CountingWriter<BufWriter<W>>,
18 value_writer: TValueWriter,
19 stateless_buffer: Vec<u8>,
21}
22
23impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
24where
25 W: io::Write,
26 TValueWriter: ValueWriter,
27{
28 pub fn new(wrt: W) -> Self {
29 DeltaWriter {
30 block: Vec::with_capacity(BLOCK_LEN * 2),
31 write: CountingWriter::wrap(BufWriter::new(wrt)),
32 value_writer: TValueWriter::default(),
33 stateless_buffer: Vec::new(),
34 }
35 }
36}
37
38impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
39where
40 W: io::Write,
41 TValueWriter: value::ValueWriter,
42{
43 pub fn flush_block(&mut self) -> io::Result<Option<Range<usize>>> {
44 if self.block.is_empty() {
45 return Ok(None);
46 }
47 let start_offset = self.write.written_bytes() as usize;
48 let buffer: &mut Vec<u8> = &mut self.stateless_buffer;
49 self.value_writer.serialize_block(buffer);
50 self.value_writer.clear();
51 let block_len = buffer.len() + self.block.len();
52 self.write.write_all(&(block_len as u32).to_le_bytes())?;
53 self.write.write_all(&buffer[..])?;
54 self.write.write_all(&self.block[..])?;
55 let end_offset = self.write.written_bytes() as usize;
56 self.block.clear();
57 buffer.clear();
58 Ok(Some(start_offset..end_offset))
59 }
60
61 fn encode_keep_add(&mut self, keep_len: usize, add_len: usize) {
62 if keep_len < FOUR_BIT_LIMITS && add_len < FOUR_BIT_LIMITS {
63 let b = (keep_len | add_len << 4) as u8;
64 self.block.extend_from_slice(&[b])
65 } else {
66 let mut buf = [VINT_MODE; 20];
67 let mut len = 1 + vint::serialize(keep_len as u64, &mut buf[1..]);
68 len += vint::serialize(add_len as u64, &mut buf[len..]);
69 self.block.extend_from_slice(&buf[..len])
70 }
71 }
72
73 pub(crate) fn write_suffix(&mut self, common_prefix_len: usize, suffix: &[u8]) {
74 let keep_len = common_prefix_len;
75 let add_len = suffix.len();
76 self.encode_keep_add(keep_len, add_len);
77 self.block.extend_from_slice(suffix);
78 }
79
80 pub(crate) fn write_value(&mut self, value: &TValueWriter::Value) {
81 self.value_writer.write(value);
82 }
83
84 pub fn flush_block_if_required(&mut self) -> io::Result<Option<Range<usize>>> {
85 if self.block.len() > BLOCK_LEN {
86 return self.flush_block();
87 }
88 Ok(None)
89 }
90
91 pub fn finish(self) -> CountingWriter<BufWriter<W>> {
92 self.write
93 }
94}
95
96pub struct DeltaReader<'a, TValueReader> {
97 common_prefix_len: usize,
98 suffix_range: Range<usize>,
99 value_reader: TValueReader,
100 block_reader: BlockReader<'a>,
101 idx: usize,
102}
103
104impl<'a, TValueReader> DeltaReader<'a, TValueReader>
105where TValueReader: value::ValueReader
106{
107 pub fn new<R: io::Read + 'a>(reader: R) -> Self {
108 DeltaReader {
109 idx: 0,
110 common_prefix_len: 0,
111 suffix_range: 0..0,
112 value_reader: TValueReader::default(),
113 block_reader: BlockReader::new(Box::new(reader)),
114 }
115 }
116
117 pub fn empty() -> Self {
118 DeltaReader::new(&b""[..])
119 }
120
121 fn deserialize_vint(&mut self) -> u64 {
122 self.block_reader.deserialize_u64()
123 }
124
125 fn read_keep_add(&mut self) -> Option<(usize, usize)> {
126 let b = {
127 let buf = &self.block_reader.buffer();
128 if buf.is_empty() {
129 return None;
130 }
131 buf[0]
132 };
133 self.block_reader.advance(1);
134 match b {
135 VINT_MODE => {
136 let keep = self.deserialize_vint() as usize;
137 let add = self.deserialize_vint() as usize;
138 Some((keep, add))
139 }
140 b => {
141 let keep = (b & 0b1111) as usize;
142 let add = (b >> 4) as usize;
143 Some((keep, add))
144 }
145 }
146 }
147
148 fn read_delta_key(&mut self) -> bool {
149 let Some((keep, add)) = self.read_keep_add() else {
150 return false;
151 };
152 self.common_prefix_len = keep;
153 let suffix_start = self.block_reader.offset();
154 self.suffix_range = suffix_start..(suffix_start + add);
155 self.block_reader.advance(add);
156 true
157 }
158
159 pub fn advance(&mut self) -> io::Result<bool> {
160 if self.block_reader.buffer().is_empty() {
161 if !self.block_reader.read_block()? {
162 return Ok(false);
163 }
164 let consumed_len = self.value_reader.load(self.block_reader.buffer())?;
165 self.block_reader.advance(consumed_len);
166 self.idx = 0;
167 } else {
168 self.idx += 1;
169 }
170 if !self.read_delta_key() {
171 return Ok(false);
172 }
173 Ok(true)
174 }
175
176 #[inline(always)]
177 pub fn common_prefix_len(&self) -> usize {
178 self.common_prefix_len
179 }
180
181 #[inline(always)]
182 pub fn suffix(&self) -> &[u8] {
183 self.block_reader.buffer_from_to(self.suffix_range.clone())
184 }
185
186 #[inline(always)]
187 pub fn value(&self) -> &TValueReader::Value {
188 self.value_reader.value(self.idx)
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::DeltaReader;
195 use crate::value::U64MonotonicValueReader;
196
197 #[test]
198 fn test_empty() {
199 let mut delta_reader: DeltaReader<U64MonotonicValueReader> = DeltaReader::empty();
200 assert!(!delta_reader.advance().unwrap());
201 }
202}