base_d/encoders/streaming/
encoder.rs1use crate::core::dictionary::Dictionary;
2use crate::features::compression::CompressionAlgorithm;
3use crate::features::hashing::HashAlgorithm;
4use std::io::{Read, Write};
5
6use super::hasher::{HasherWriter, create_hasher_writer};
7
8const CHUNK_SIZE: usize = 4096; pub struct StreamingEncoder<'a, W: Write> {
16 dictionary: &'a Dictionary,
17 writer: W,
18 compress_algo: Option<CompressionAlgorithm>,
19 compress_level: u32,
20 hash_algo: Option<HashAlgorithm>,
21 xxhash_config: crate::features::hashing::XxHashConfig,
22}
23
24impl<'a, W: Write> StreamingEncoder<'a, W> {
25 pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
32 StreamingEncoder {
33 dictionary,
34 writer,
35 compress_algo: None,
36 compress_level: 6,
37 hash_algo: None,
38 xxhash_config: crate::features::hashing::XxHashConfig::default(),
39 }
40 }
41
42 pub fn with_compression(mut self, algo: CompressionAlgorithm, level: u32) -> Self {
44 self.compress_algo = Some(algo);
45 self.compress_level = level;
46 self
47 }
48
49 pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
51 self.hash_algo = Some(algo);
52 self
53 }
54
55 pub fn with_xxhash_config(mut self, config: crate::features::hashing::XxHashConfig) -> Self {
57 self.xxhash_config = config;
58 self
59 }
60
61 pub fn encode<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
69 if let Some(algo) = self.compress_algo {
71 return self.encode_with_compression(reader, algo);
72 }
73
74 let hash = match self.dictionary.mode() {
76 crate::core::config::EncodingMode::Chunked => self.encode_chunked(reader)?,
77 crate::core::config::EncodingMode::ByteRange => self.encode_byte_range(reader)?,
78 crate::core::config::EncodingMode::Radix => {
79 let mut buffer = Vec::new();
81 reader.read_to_end(&mut buffer)?;
82
83 let hash = self
84 .hash_algo
85 .map(|algo| crate::features::hashing::hash(&buffer, algo));
86
87 let encoded = crate::encoders::algorithms::radix::encode(&buffer, self.dictionary);
88 self.writer.write_all(encoded.as_bytes())?;
89 hash
90 }
91 };
92
93 Ok(hash)
94 }
95
96 fn encode_with_compression<R: Read>(
98 &mut self,
99 reader: &mut R,
100 algo: CompressionAlgorithm,
101 ) -> std::io::Result<Option<Vec<u8>>> {
102 use std::io::Cursor;
103
104 let mut compressed_data = Vec::new();
106 let hash = self.compress_stream(reader, &mut compressed_data, algo)?;
107
108 let mut cursor = Cursor::new(compressed_data);
110 match self.dictionary.mode() {
111 crate::core::config::EncodingMode::Chunked => {
112 self.encode_chunked_no_hash(&mut cursor)?;
113 }
114 crate::core::config::EncodingMode::ByteRange => {
115 self.encode_byte_range_no_hash(&mut cursor)?;
116 }
117 crate::core::config::EncodingMode::Radix => {
118 let buffer = cursor.into_inner();
119 let encoded = crate::encoders::algorithms::radix::encode(&buffer, self.dictionary);
120 self.writer.write_all(encoded.as_bytes())?;
121 }
122 }
123
124 Ok(hash)
125 }
126
127 fn compress_stream<R: Read>(
129 &mut self,
130 reader: &mut R,
131 output: &mut Vec<u8>,
132 algo: CompressionAlgorithm,
133 ) -> std::io::Result<Option<Vec<u8>>> {
134 use flate2::write::GzEncoder;
135
136 let hasher = self
137 .hash_algo
138 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
139
140 match algo {
141 CompressionAlgorithm::Gzip => {
142 let mut encoder =
143 GzEncoder::new(output, flate2::Compression::new(self.compress_level));
144 let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
145 encoder.finish()?;
146 Ok(hash)
147 }
148 #[cfg(feature = "native-compression")]
149 CompressionAlgorithm::Zstd => {
150 let mut encoder =
151 zstd::stream::write::Encoder::new(output, self.compress_level as i32)
152 .map_err(std::io::Error::other)?;
153 let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
154 encoder.finish()?;
155 Ok(hash)
156 }
157 #[cfg(all(feature = "wasm", not(feature = "native-compression")))]
158 CompressionAlgorithm::Zstd => {
159 Err(std::io::Error::other(
161 "Zstd compression not supported in WASM (ruzstd is decode-only)",
162 ))
163 }
164 CompressionAlgorithm::Brotli => {
165 let mut encoder =
166 brotli::CompressorWriter::new(output, 4096, self.compress_level, 22);
167 let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
168 Ok(hash)
169 }
170 #[cfg(feature = "native-compression")]
171 CompressionAlgorithm::Lzma => {
172 use xz2::write::XzEncoder;
173 let mut encoder = XzEncoder::new(output, self.compress_level);
174 let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
175 encoder.finish()?;
176 Ok(hash)
177 }
178 #[cfg(all(feature = "wasm", not(feature = "native-compression")))]
179 CompressionAlgorithm::Lzma => {
180 let mut buffer = Vec::new();
182 reader.read_to_end(&mut buffer)?;
183
184 let hash = self
185 .hash_algo
186 .map(|algo| crate::features::hashing::hash(&buffer, algo));
187
188 use std::io::Cursor;
189 lzma_rs::lzma_compress(&mut Cursor::new(&buffer), output)
190 .map_err(std::io::Error::other)?;
191 Ok(hash)
192 }
193 CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
194 let mut buffer = Vec::new();
197 reader.read_to_end(&mut buffer)?;
198
199 let hash = self
200 .hash_algo
201 .map(|algo| crate::features::hashing::hash(&buffer, algo));
202
203 let compressed = match algo {
204 #[cfg(feature = "native-compression")]
205 CompressionAlgorithm::Lz4 => {
206 lz4::block::compress(&buffer, None, false).map_err(std::io::Error::other)?
207 }
208 #[cfg(all(feature = "wasm", not(feature = "native-compression")))]
209 CompressionAlgorithm::Lz4 => lz4_flex::compress_prepend_size(&buffer),
210 CompressionAlgorithm::Snappy => {
211 let mut encoder = snap::raw::Encoder::new();
212 encoder
213 .compress_vec(&buffer)
214 .map_err(std::io::Error::other)?
215 }
216 _ => unreachable!(),
217 };
218 output.extend_from_slice(&compressed);
219 Ok(hash)
220 }
221 }
222 }
223
224 fn copy_with_hash<R: Read>(
225 reader: &mut R,
226 writer: &mut impl Write,
227 mut hasher: Option<HasherWriter>,
228 ) -> std::io::Result<Option<Vec<u8>>> {
229 let mut buffer = vec![0u8; CHUNK_SIZE];
230
231 loop {
232 let bytes_read = reader.read(&mut buffer)?;
233 if bytes_read == 0 {
234 break;
235 }
236
237 let chunk = &buffer[..bytes_read];
238 if let Some(ref mut h) = hasher {
239 h.update(chunk);
240 }
241 writer.write_all(chunk)?;
242 }
243
244 Ok(hasher.map(|h| h.finalize()))
245 }
246
247 fn encode_chunked<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
248 let base = self.dictionary.base();
249 let bits_per_char = (base as f64).log2() as usize;
250 let bytes_per_group = bits_per_char;
251
252 let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
254 let mut buffer = vec![0u8; aligned_chunk_size];
255
256 let mut hasher = self
257 .hash_algo
258 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
259
260 loop {
261 let bytes_read = reader.read(&mut buffer)?;
262 if bytes_read == 0 {
263 break;
264 }
265
266 let chunk = &buffer[..bytes_read];
267 if let Some(ref mut h) = hasher {
268 h.update(chunk);
269 }
270
271 let encoded =
272 crate::encoders::algorithms::chunked::encode_chunked(chunk, self.dictionary);
273 self.writer.write_all(encoded.as_bytes())?;
274 }
275
276 Ok(hasher.map(|h| h.finalize()))
277 }
278
279 fn encode_chunked_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
280 let base = self.dictionary.base();
281 let bits_per_char = (base as f64).log2() as usize;
282 let bytes_per_group = bits_per_char;
283
284 let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
285 let mut buffer = vec![0u8; aligned_chunk_size];
286
287 loop {
288 let bytes_read = reader.read(&mut buffer)?;
289 if bytes_read == 0 {
290 break;
291 }
292
293 let encoded = crate::encoders::algorithms::chunked::encode_chunked(
294 &buffer[..bytes_read],
295 self.dictionary,
296 );
297 self.writer.write_all(encoded.as_bytes())?;
298 }
299
300 Ok(())
301 }
302
303 fn encode_byte_range<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
304 let mut buffer = vec![0u8; CHUNK_SIZE];
305 let mut hasher = self
306 .hash_algo
307 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
308
309 loop {
310 let bytes_read = reader.read(&mut buffer)?;
311 if bytes_read == 0 {
312 break;
313 }
314
315 let chunk = &buffer[..bytes_read];
316 if let Some(ref mut h) = hasher {
317 h.update(chunk);
318 }
319
320 let encoded =
321 crate::encoders::algorithms::byte_range::encode_byte_range(chunk, self.dictionary);
322 self.writer.write_all(encoded.as_bytes())?;
323 }
324
325 Ok(hasher.map(|h| h.finalize()))
326 }
327
328 fn encode_byte_range_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
329 let mut buffer = vec![0u8; CHUNK_SIZE];
330
331 loop {
332 let bytes_read = reader.read(&mut buffer)?;
333 if bytes_read == 0 {
334 break;
335 }
336
337 let encoded = crate::encoders::algorithms::byte_range::encode_byte_range(
338 &buffer[..bytes_read],
339 self.dictionary,
340 );
341 self.writer.write_all(encoded.as_bytes())?;
342 }
343
344 Ok(())
345 }
346}