base_d/encoders/streaming/
encoder.rs1use crate::core::dictionary::Dictionary;
2use crate::features::compression::CompressionAlgorithm;
3use crate::features::hashing::HashAlgorithm;
4use std::io::{Read, Write};
5
6use super::hasher::{create_hasher_writer, HasherWriter};
7
8const CHUNK_SIZE: usize = 4096; pub struct StreamingEncoder<'a, W: Write> {
16 dictionary: &'a Dictionary,
17 writer: W,
18 compress_algo: Option<CompressionAlgorithm>,
19 compress_level: u32,
20 hash_algo: Option<HashAlgorithm>,
21 xxhash_config: crate::features::hashing::XxHashConfig,
22}
23
24impl<'a, W: Write> StreamingEncoder<'a, W> {
25 pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
32 StreamingEncoder {
33 dictionary,
34 writer,
35 compress_algo: None,
36 compress_level: 6,
37 hash_algo: None,
38 xxhash_config: crate::features::hashing::XxHashConfig::default(),
39 }
40 }
41
42 pub fn with_compression(mut self, algo: CompressionAlgorithm, level: u32) -> Self {
44 self.compress_algo = Some(algo);
45 self.compress_level = level;
46 self
47 }
48
49 pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
51 self.hash_algo = Some(algo);
52 self
53 }
54
55 pub fn with_xxhash_config(mut self, config: crate::features::hashing::XxHashConfig) -> Self {
57 self.xxhash_config = config;
58 self
59 }
60
61 pub fn encode<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
69 if let Some(algo) = self.compress_algo {
71 return self.encode_with_compression(reader, algo);
72 }
73
74 let hash = match self.dictionary.mode() {
76 crate::core::config::EncodingMode::Chunked => self.encode_chunked(reader)?,
77 crate::core::config::EncodingMode::ByteRange => self.encode_byte_range(reader)?,
78 crate::core::config::EncodingMode::BaseConversion => {
79 let mut buffer = Vec::new();
81 reader.read_to_end(&mut buffer)?;
82
83 let hash = self
84 .hash_algo
85 .map(|algo| crate::features::hashing::hash(&buffer, algo));
86
87 let encoded = crate::encoders::algorithms::math::encode(&buffer, self.dictionary);
88 self.writer.write_all(encoded.as_bytes())?;
89 hash
90 }
91 };
92
93 Ok(hash)
94 }
95
96 fn encode_with_compression<R: Read>(
98 &mut self,
99 reader: &mut R,
100 algo: CompressionAlgorithm,
101 ) -> std::io::Result<Option<Vec<u8>>> {
102 use std::io::Cursor;
103
104 let mut compressed_data = Vec::new();
106 let hash = self.compress_stream(reader, &mut compressed_data, algo)?;
107
108 let mut cursor = Cursor::new(compressed_data);
110 match self.dictionary.mode() {
111 crate::core::config::EncodingMode::Chunked => {
112 self.encode_chunked_no_hash(&mut cursor)?;
113 }
114 crate::core::config::EncodingMode::ByteRange => {
115 self.encode_byte_range_no_hash(&mut cursor)?;
116 }
117 crate::core::config::EncodingMode::BaseConversion => {
118 let buffer = cursor.into_inner();
119 let encoded = crate::encoders::algorithms::math::encode(&buffer, self.dictionary);
120 self.writer.write_all(encoded.as_bytes())?;
121 }
122 }
123
124 Ok(hash)
125 }
126
127 fn compress_stream<R: Read>(
129 &mut self,
130 reader: &mut R,
131 output: &mut Vec<u8>,
132 algo: CompressionAlgorithm,
133 ) -> std::io::Result<Option<Vec<u8>>> {
134 use flate2::write::GzEncoder;
135 use xz2::write::XzEncoder;
136
137 let hasher = self
138 .hash_algo
139 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
140
141 match algo {
142 CompressionAlgorithm::Gzip => {
143 let mut encoder =
144 GzEncoder::new(output, flate2::Compression::new(self.compress_level));
145 let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
146 encoder.finish()?;
147 Ok(hash)
148 }
149 CompressionAlgorithm::Zstd => {
150 let mut encoder =
151 zstd::stream::write::Encoder::new(output, self.compress_level as i32)
152 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
153 let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
154 encoder.finish()?;
155 Ok(hash)
156 }
157 CompressionAlgorithm::Brotli => {
158 let mut encoder =
159 brotli::CompressorWriter::new(output, 4096, self.compress_level, 22);
160 let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
161 Ok(hash)
162 }
163 CompressionAlgorithm::Lzma => {
164 let mut encoder = XzEncoder::new(output, self.compress_level);
165 let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
166 encoder.finish()?;
167 Ok(hash)
168 }
169 CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
170 let mut buffer = Vec::new();
173 reader.read_to_end(&mut buffer)?;
174
175 let hash = self
176 .hash_algo
177 .map(|algo| crate::features::hashing::hash(&buffer, algo));
178
179 let compressed = match algo {
180 CompressionAlgorithm::Lz4 => lz4::block::compress(&buffer, None, false)
181 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
182 CompressionAlgorithm::Snappy => {
183 let mut encoder = snap::raw::Encoder::new();
184 encoder
185 .compress_vec(&buffer)
186 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
187 }
188 _ => unreachable!(),
189 };
190 output.extend_from_slice(&compressed);
191 Ok(hash)
192 }
193 }
194 }
195
196 fn copy_with_hash<R: Read>(
197 reader: &mut R,
198 writer: &mut impl Write,
199 mut hasher: Option<HasherWriter>,
200 ) -> std::io::Result<Option<Vec<u8>>> {
201 let mut buffer = vec![0u8; CHUNK_SIZE];
202
203 loop {
204 let bytes_read = reader.read(&mut buffer)?;
205 if bytes_read == 0 {
206 break;
207 }
208
209 let chunk = &buffer[..bytes_read];
210 if let Some(ref mut h) = hasher {
211 h.update(chunk);
212 }
213 writer.write_all(chunk)?;
214 }
215
216 Ok(hasher.map(|h| h.finalize()))
217 }
218
219 fn encode_chunked<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
220 let base = self.dictionary.base();
221 let bits_per_char = (base as f64).log2() as usize;
222 let bytes_per_group = bits_per_char;
223
224 let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
226 let mut buffer = vec![0u8; aligned_chunk_size];
227
228 let mut hasher = self
229 .hash_algo
230 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
231
232 loop {
233 let bytes_read = reader.read(&mut buffer)?;
234 if bytes_read == 0 {
235 break;
236 }
237
238 let chunk = &buffer[..bytes_read];
239 if let Some(ref mut h) = hasher {
240 h.update(chunk);
241 }
242
243 let encoded =
244 crate::encoders::algorithms::chunked::encode_chunked(chunk, self.dictionary);
245 self.writer.write_all(encoded.as_bytes())?;
246 }
247
248 Ok(hasher.map(|h| h.finalize()))
249 }
250
251 fn encode_chunked_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
252 let base = self.dictionary.base();
253 let bits_per_char = (base as f64).log2() as usize;
254 let bytes_per_group = bits_per_char;
255
256 let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
257 let mut buffer = vec![0u8; aligned_chunk_size];
258
259 loop {
260 let bytes_read = reader.read(&mut buffer)?;
261 if bytes_read == 0 {
262 break;
263 }
264
265 let encoded = crate::encoders::algorithms::chunked::encode_chunked(
266 &buffer[..bytes_read],
267 self.dictionary,
268 );
269 self.writer.write_all(encoded.as_bytes())?;
270 }
271
272 Ok(())
273 }
274
275 fn encode_byte_range<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
276 let mut buffer = vec![0u8; CHUNK_SIZE];
277 let mut hasher = self
278 .hash_algo
279 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
280
281 loop {
282 let bytes_read = reader.read(&mut buffer)?;
283 if bytes_read == 0 {
284 break;
285 }
286
287 let chunk = &buffer[..bytes_read];
288 if let Some(ref mut h) = hasher {
289 h.update(chunk);
290 }
291
292 let encoded =
293 crate::encoders::algorithms::byte_range::encode_byte_range(chunk, self.dictionary);
294 self.writer.write_all(encoded.as_bytes())?;
295 }
296
297 Ok(hasher.map(|h| h.finalize()))
298 }
299
300 fn encode_byte_range_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
301 let mut buffer = vec![0u8; CHUNK_SIZE];
302
303 loop {
304 let bytes_read = reader.read(&mut buffer)?;
305 if bytes_read == 0 {
306 break;
307 }
308
309 let encoded = crate::encoders::algorithms::byte_range::encode_byte_range(
310 &buffer[..bytes_read],
311 self.dictionary,
312 );
313 self.writer.write_all(encoded.as_bytes())?;
314 }
315
316 Ok(())
317 }
318}