base_d/encoders/streaming/
encoder.rs1use crate::core::dictionary::Dictionary;
2use crate::features::compression::CompressionAlgorithm;
3use crate::features::hashing::HashAlgorithm;
4use std::io::{Read, Write};
5
6use super::hasher::{HasherWriter, create_hasher_writer};
7
8const CHUNK_SIZE: usize = 4096; pub struct StreamingEncoder<'a, W: Write> {
16 dictionary: &'a Dictionary,
17 writer: W,
18 compress_algo: Option<CompressionAlgorithm>,
19 compress_level: u32,
20 hash_algo: Option<HashAlgorithm>,
21 xxhash_config: crate::features::hashing::XxHashConfig,
22}
23
24impl<'a, W: Write> StreamingEncoder<'a, W> {
25 pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
32 StreamingEncoder {
33 dictionary,
34 writer,
35 compress_algo: None,
36 compress_level: 6,
37 hash_algo: None,
38 xxhash_config: crate::features::hashing::XxHashConfig::default(),
39 }
40 }
41
42 pub fn with_compression(mut self, algo: CompressionAlgorithm, level: u32) -> Self {
44 self.compress_algo = Some(algo);
45 self.compress_level = level;
46 self
47 }
48
49 pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
51 self.hash_algo = Some(algo);
52 self
53 }
54
55 pub fn with_xxhash_config(mut self, config: crate::features::hashing::XxHashConfig) -> Self {
57 self.xxhash_config = config;
58 self
59 }
60
61 pub fn encode<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
69 if let Some(algo) = self.compress_algo {
71 return self.encode_with_compression(reader, algo);
72 }
73
74 let hash = match self.dictionary.mode() {
76 crate::core::config::EncodingMode::Chunked => self.encode_chunked(reader)?,
77 crate::core::config::EncodingMode::ByteRange => self.encode_byte_range(reader)?,
78 crate::core::config::EncodingMode::Radix => {
79 let mut buffer = Vec::new();
81 reader.read_to_end(&mut buffer)?;
82
83 let hash = self
84 .hash_algo
85 .map(|algo| crate::features::hashing::hash(&buffer, algo));
86
87 let encoded = crate::encoders::algorithms::radix::encode(&buffer, self.dictionary);
88 self.writer.write_all(encoded.as_bytes())?;
89 hash
90 }
91 };
92
93 Ok(hash)
94 }
95
96 fn encode_with_compression<R: Read>(
98 &mut self,
99 reader: &mut R,
100 algo: CompressionAlgorithm,
101 ) -> std::io::Result<Option<Vec<u8>>> {
102 use std::io::Cursor;
103
104 let mut compressed_data = Vec::new();
106 let hash = self.compress_stream(reader, &mut compressed_data, algo)?;
107
108 let mut cursor = Cursor::new(compressed_data);
110 match self.dictionary.mode() {
111 crate::core::config::EncodingMode::Chunked => {
112 self.encode_chunked_no_hash(&mut cursor)?;
113 }
114 crate::core::config::EncodingMode::ByteRange => {
115 self.encode_byte_range_no_hash(&mut cursor)?;
116 }
117 crate::core::config::EncodingMode::Radix => {
118 let buffer = cursor.into_inner();
119 let encoded = crate::encoders::algorithms::radix::encode(&buffer, self.dictionary);
120 self.writer.write_all(encoded.as_bytes())?;
121 }
122 }
123
124 Ok(hash)
125 }
126
127 fn compress_stream<R: Read>(
129 &mut self,
130 reader: &mut R,
131 output: &mut Vec<u8>,
132 algo: CompressionAlgorithm,
133 ) -> std::io::Result<Option<Vec<u8>>> {
134 use flate2::write::GzEncoder;
135 use xz2::write::XzEncoder;
136
137 let hasher = self
138 .hash_algo
139 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
140
141 match algo {
142 CompressionAlgorithm::Gzip => {
143 let mut encoder =
144 GzEncoder::new(output, flate2::Compression::new(self.compress_level));
145 let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
146 encoder.finish()?;
147 Ok(hash)
148 }
149 CompressionAlgorithm::Zstd => {
150 let mut encoder =
151 zstd::stream::write::Encoder::new(output, self.compress_level as i32)
152 .map_err(std::io::Error::other)?;
153 let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
154 encoder.finish()?;
155 Ok(hash)
156 }
157 CompressionAlgorithm::Brotli => {
158 let mut encoder =
159 brotli::CompressorWriter::new(output, 4096, self.compress_level, 22);
160 let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
161 Ok(hash)
162 }
163 CompressionAlgorithm::Lzma => {
164 let mut encoder = XzEncoder::new(output, self.compress_level);
165 let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
166 encoder.finish()?;
167 Ok(hash)
168 }
169 CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
170 let mut buffer = Vec::new();
173 reader.read_to_end(&mut buffer)?;
174
175 let hash = self
176 .hash_algo
177 .map(|algo| crate::features::hashing::hash(&buffer, algo));
178
179 let compressed = match algo {
180 CompressionAlgorithm::Lz4 => {
181 lz4::block::compress(&buffer, None, false).map_err(std::io::Error::other)?
182 }
183 CompressionAlgorithm::Snappy => {
184 let mut encoder = snap::raw::Encoder::new();
185 encoder
186 .compress_vec(&buffer)
187 .map_err(std::io::Error::other)?
188 }
189 _ => unreachable!(),
190 };
191 output.extend_from_slice(&compressed);
192 Ok(hash)
193 }
194 }
195 }
196
197 fn copy_with_hash<R: Read>(
198 reader: &mut R,
199 writer: &mut impl Write,
200 mut hasher: Option<HasherWriter>,
201 ) -> std::io::Result<Option<Vec<u8>>> {
202 let mut buffer = vec![0u8; CHUNK_SIZE];
203
204 loop {
205 let bytes_read = reader.read(&mut buffer)?;
206 if bytes_read == 0 {
207 break;
208 }
209
210 let chunk = &buffer[..bytes_read];
211 if let Some(ref mut h) = hasher {
212 h.update(chunk);
213 }
214 writer.write_all(chunk)?;
215 }
216
217 Ok(hasher.map(|h| h.finalize()))
218 }
219
220 fn encode_chunked<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
221 let base = self.dictionary.base();
222 let bits_per_char = (base as f64).log2() as usize;
223 let bytes_per_group = bits_per_char;
224
225 let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
227 let mut buffer = vec![0u8; aligned_chunk_size];
228
229 let mut hasher = self
230 .hash_algo
231 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
232
233 loop {
234 let bytes_read = reader.read(&mut buffer)?;
235 if bytes_read == 0 {
236 break;
237 }
238
239 let chunk = &buffer[..bytes_read];
240 if let Some(ref mut h) = hasher {
241 h.update(chunk);
242 }
243
244 let encoded =
245 crate::encoders::algorithms::chunked::encode_chunked(chunk, self.dictionary);
246 self.writer.write_all(encoded.as_bytes())?;
247 }
248
249 Ok(hasher.map(|h| h.finalize()))
250 }
251
252 fn encode_chunked_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
253 let base = self.dictionary.base();
254 let bits_per_char = (base as f64).log2() as usize;
255 let bytes_per_group = bits_per_char;
256
257 let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
258 let mut buffer = vec![0u8; aligned_chunk_size];
259
260 loop {
261 let bytes_read = reader.read(&mut buffer)?;
262 if bytes_read == 0 {
263 break;
264 }
265
266 let encoded = crate::encoders::algorithms::chunked::encode_chunked(
267 &buffer[..bytes_read],
268 self.dictionary,
269 );
270 self.writer.write_all(encoded.as_bytes())?;
271 }
272
273 Ok(())
274 }
275
276 fn encode_byte_range<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
277 let mut buffer = vec![0u8; CHUNK_SIZE];
278 let mut hasher = self
279 .hash_algo
280 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
281
282 loop {
283 let bytes_read = reader.read(&mut buffer)?;
284 if bytes_read == 0 {
285 break;
286 }
287
288 let chunk = &buffer[..bytes_read];
289 if let Some(ref mut h) = hasher {
290 h.update(chunk);
291 }
292
293 let encoded =
294 crate::encoders::algorithms::byte_range::encode_byte_range(chunk, self.dictionary);
295 self.writer.write_all(encoded.as_bytes())?;
296 }
297
298 Ok(hasher.map(|h| h.finalize()))
299 }
300
301 fn encode_byte_range_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
302 let mut buffer = vec![0u8; CHUNK_SIZE];
303
304 loop {
305 let bytes_read = reader.read(&mut buffer)?;
306 if bytes_read == 0 {
307 break;
308 }
309
310 let encoded = crate::encoders::algorithms::byte_range::encode_byte_range(
311 &buffer[..bytes_read],
312 self.dictionary,
313 );
314 self.writer.write_all(encoded.as_bytes())?;
315 }
316
317 Ok(())
318 }
319}