async_sevenz/
writer.rs

1mod counting_writer;
2#[cfg(not(target_arch = "wasm32"))]
3mod lazy_file_reader;
4mod pack_info;
5mod seq_reader;
6mod source_reader;
7mod unpack_info;
8
9#[cfg(not(target_arch = "wasm32"))]
10use futures_lite::io::Cursor;
11use futures_lite::io::{
12    AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, SeekFrom,
13};
14use std::{cell::Cell, rc::Rc, sync::Arc};
15
16pub(crate) use counting_writer::CountingWriter;
17use crc32fast::Hasher;
18
19#[cfg(not(target_arch = "wasm32"))]
20pub(crate) use self::lazy_file_reader::LazyFileReader;
21pub(crate) use self::seq_reader::SeqReader;
22pub use self::source_reader::SourceReader;
23use self::{pack_info::PackInfo, unpack_info::UnpackInfo};
24use crate::{ArchiveEntry, AutoFinish, AutoFinisher, Error, archive::*, bitset::BitSet, encoder};
25
26macro_rules! write_times {
27    ($fn_name:tt, $nid:expr, $has_time:tt, $time:tt) => {
28        async fn $fn_name<H: AsyncWrite + Unpin>(&self, header: &mut H) -> std::io::Result<()> {
29            let mut num = 0;
30            for entry in self.files.iter() {
31                if entry.$has_time {
32                    num += 1;
33                }
34            }
35            if num > 0 {
36                AsyncWriteExt::write_all(header, &[$nid]).await?;
37                let mut temp: Vec<u8> = Vec::with_capacity(128);
38                if num != self.files.len() {
39                    temp.push(0);
40                    let mut times = BitSet::with_capacity(self.files.len());
41                    for i in 0..self.files.len() {
42                        if self.files[i].$has_time {
43                            times.insert(i);
44                        }
45                    }
46                    let bits = bitset_to_bytes(&times, self.files.len());
47                    temp.extend_from_slice(&bits);
48                } else {
49                    temp.push(1);
50                }
51                temp.push(0);
52                for file in self.files.iter() {
53                    if file.$has_time {
54                        vec_push_le_u64(&mut temp, (file.$time).into());
55                    }
56                }
57                write_encoded_u64(header, temp.len() as u64).await?;
58                AsyncWriteExt::write_all(header, &temp).await?;
59            }
60            Ok(())
61        }
62    };
63    ($fn_name:tt, $nid:expr, $has_time:tt, $time:tt, write_u32) => {
64        async fn $fn_name<H: AsyncWrite + Unpin>(&self, header: &mut H) -> std::io::Result<()> {
65            let mut num = 0;
66            for entry in self.files.iter() {
67                if entry.$has_time {
68                    num += 1;
69                }
70            }
71            if num > 0 {
72                AsyncWriteExt::write_all(header, &[$nid]).await?;
73                let mut temp: Vec<u8> = Vec::with_capacity(128);
74                if num != self.files.len() {
75                    temp.push(0);
76                    let mut times = BitSet::with_capacity(self.files.len());
77                    for i in 0..self.files.len() {
78                        if self.files[i].$has_time {
79                            times.insert(i);
80                        }
81                    }
82                    let bits = bitset_to_bytes(&times, self.files.len());
83                    temp.extend_from_slice(&bits);
84                } else {
85                    temp.push(1);
86                }
87                temp.push(0);
88                for file in self.files.iter() {
89                    if file.$has_time {
90                        vec_push_le_u32(&mut temp, file.$time);
91                    }
92                }
93                write_encoded_u64(header, temp.len() as u64).await?;
94                AsyncWriteExt::write_all(header, &temp).await?;
95            }
96            Ok(())
97        }
98    };
99}
100
101type Result<T> = std::result::Result<T, Error>;
102
103/// Writes a 7z archive file.
104pub struct ArchiveWriter<W: AsyncWrite + AsyncSeek + Unpin> {
105    output: W,
106    files: Vec<ArchiveEntry>,
107    content_methods: Arc<Vec<EncoderConfiguration>>,
108    pack_info: PackInfo,
109    unpack_info: UnpackInfo,
110    encrypt_header: bool,
111}
112
113#[cfg(not(target_arch = "wasm32"))]
114impl ArchiveWriter<Cursor<Vec<u8>>> {
115    /// 创建一个基于内存缓冲的 7z 写入器。
116    ///
117    /// 返回使用 `Vec<u8>` 作为底层存储的 `ArchiveWriter`,适合测试或无需落盘的场景。
118    pub async fn create_in_memory() -> Result<Self> {
119        let cursor = Cursor::new(Vec::<u8>::new());
120        Self::new(cursor).await
121    }
122}
123
124impl<W: AsyncWrite + AsyncSeek + Unpin> ArchiveWriter<W> {
125    /// Prepares writer to write a 7z archive to.
126    pub async fn new(mut writer: W) -> Result<Self> {
127        AsyncSeekExt::seek(&mut writer, SeekFrom::Start(SIGNATURE_HEADER_SIZE)).await?;
128
129        Ok(Self {
130            output: writer,
131            files: Default::default(),
132            content_methods: Arc::new(vec![EncoderConfiguration::new(EncoderMethod::LZMA2)]),
133            pack_info: Default::default(),
134            unpack_info: Default::default(),
135            encrypt_header: true,
136        })
137    }
138
139    /// Returns a wrapper around `self` that will finish the stream on drop.
140    pub fn auto_finish(self) -> AutoFinisher<Self> {
141        AutoFinisher(Some(self))
142    }
143
144    /// Sets the default compression methods to use for entry data. Default is LZMA2.
145    pub fn set_content_methods(&mut self, content_methods: Vec<EncoderConfiguration>) -> &mut Self {
146        if content_methods.is_empty() {
147            return self;
148        }
149        self.content_methods = Arc::new(content_methods);
150        self
151    }
152
153    /// Whether to enable the encryption of the -header. Default is `true`.
154    pub fn set_encrypt_header(&mut self, enabled: bool) {
155        self.encrypt_header = enabled;
156    }
157
158    /// Non-solid compression - Adds an archive `entry` with data from `reader`.
159    ///
160    /// # Example
161    /// ```no_run
162    /// use std::io::Cursor;
163    /// use std::path::Path;
164    /// use async_sevenz::*;
165    /// #[tokio::main]
166    /// async fn main() {
167    ///     let mut sz = ArchiveWriter::create_in_memory().await.expect("create writer ok");
168    ///     let src = Path::new("path/to/source.txt");
169    ///     let name = "source.txt".to_string();
170    ///     let entry = sz
171    ///         .push_archive_entry(
172    ///             ArchiveEntry::from_path(&src, name).await,
173    ///             Some(futures_lite::io::Cursor::new(&b"example"[..])),
174    ///         )
175    ///         .await
176    ///         .expect("ok");
177    ///     let compressed_size = entry.compressed_size;
178    ///     let _cursor = sz.finish().await.expect("done");
179    /// }
180    /// ```
181    pub async fn push_archive_entry<R: AsyncRead + Unpin>(
182        &mut self,
183        mut entry: ArchiveEntry,
184        reader: Option<R>,
185    ) -> Result<&ArchiveEntry> {
186        if !entry.is_directory {
187            if let Some(mut r) = reader {
188                let mut compressed_len = 0;
189                let mut compressed = CompressWrapWriter::new(&mut self.output, &mut compressed_len);
190
191                let mut more_sizes: Vec<Rc<Cell<usize>>> =
192                    Vec::with_capacity(self.content_methods.len() - 1);
193
194                let (crc, size) = {
195                    let mut w = Self::create_writer(
196                        &self.content_methods,
197                        &mut compressed,
198                        &mut more_sizes,
199                    )?;
200                    let mut write_len = 0;
201                    let mut w = CompressWrapWriter::new(&mut w, &mut write_len);
202                    let mut buf = [0u8; 4096];
203                    loop {
204                        let n = AsyncReadExt::read(&mut r, &mut buf).await.map_err(|e| {
205                            Error::io_msg(e, format!("Encode entry:{}", entry.name()))
206                        })?;
207                        if n == 0 {
208                            break;
209                        }
210                        AsyncWriteExt::write_all(&mut w, &buf[..n])
211                            .await
212                            .map_err(|e| {
213                                Error::io_msg(e, format!("Encode entry:{}", entry.name()))
214                            })?;
215                    }
216                    AsyncWriteExt::flush(&mut w)
217                        .await
218                        .map_err(|e| Error::io_msg(e, format!("Encode entry:{}", entry.name())))?;
219                    AsyncWriteExt::write(&mut w, &[])
220                        .await
221                        .map_err(|e| Error::io_msg(e, format!("Encode entry:{}", entry.name())))?;
222
223                    (w.crc_value(), write_len)
224                };
225                let compressed_crc = compressed.crc_value();
226                entry.has_stream = true;
227                entry.size = size as u64;
228                entry.crc = crc as u64;
229                entry.has_crc = true;
230                entry.compressed_crc = compressed_crc as u64;
231                entry.compressed_size = compressed_len as u64;
232                self.pack_info
233                    .add_stream(compressed_len as u64, compressed_crc);
234
235                let mut sizes = Vec::with_capacity(more_sizes.len() + 1);
236                sizes.extend(more_sizes.iter().map(|s| s.get() as u64));
237                sizes.push(size as u64);
238
239                self.unpack_info
240                    .add(self.content_methods.clone(), sizes, crc);
241
242                self.files.push(entry);
243                return Ok(self.files.last().unwrap());
244            }
245        }
246        entry.has_stream = false;
247        entry.size = 0;
248        entry.compressed_size = 0;
249        entry.has_crc = false;
250        self.files.push(entry);
251        Ok(self.files.last().unwrap())
252    }
253
254    /// Solid compression - packs `entries` into one pack.
255    ///
256    /// # Panics
257    /// * If `entries`'s length not equals to `reader.reader_len()`
258    pub async fn push_archive_entries<R: AsyncRead + Unpin>(
259        &mut self,
260        entries: Vec<ArchiveEntry>,
261        reader: Vec<SourceReader<R>>,
262    ) -> Result<&mut Self> {
263        let mut entries = entries;
264        let mut r = SeqReader::new(reader);
265        assert_eq!(r.reader_len(), entries.len());
266        let mut compressed_len = 0;
267        let mut compressed = CompressWrapWriter::new(&mut self.output, &mut compressed_len);
268        let content_methods = &self.content_methods;
269        let mut more_sizes: Vec<Rc<Cell<usize>>> = Vec::with_capacity(content_methods.len() - 1);
270
271        let (crc, size) = {
272            let mut w = Self::create_writer(content_methods, &mut compressed, &mut more_sizes)?;
273            let mut write_len = 0;
274            let mut w = CompressWrapWriter::new(&mut w, &mut write_len);
275            let mut buf = [0u8; 4096];
276
277            fn entries_names(entries: &[ArchiveEntry]) -> String {
278                let mut names = String::with_capacity(512);
279                for ele in entries.iter() {
280                    names.push_str(&ele.name);
281                    names.push(';');
282                    if names.len() > 512 {
283                        break;
284                    }
285                }
286                names
287            }
288
289            loop {
290                let n = AsyncReadExt::read(&mut r, &mut buf).await.map_err(|e| {
291                    Error::io_msg(e, format!("Encode entries:{}", entries_names(&entries)))
292                })?;
293                if n == 0 {
294                    break;
295                }
296                AsyncWriteExt::write_all(&mut w, &buf[..n])
297                    .await
298                    .map_err(|e| {
299                        Error::io_msg(e, format!("Encode entries:{}", entries_names(&entries)))
300                    })?;
301            }
302            AsyncWriteExt::flush(&mut w).await.map_err(|e| {
303                let mut names = String::with_capacity(512);
304                for ele in entries.iter() {
305                    names.push_str(&ele.name);
306                    names.push(';');
307                    if names.len() > 512 {
308                        break;
309                    }
310                }
311                Error::io_msg(e, format!("Encode entry:{names}"))
312            })?;
313            AsyncWriteExt::write(&mut w, &[]).await.map_err(|e| {
314                Error::io_msg(e, format!("Encode entry:{}", entries_names(&entries)))
315            })?;
316
317            (w.crc_value(), write_len)
318        };
319        let compressed_crc = compressed.crc_value();
320        let mut sub_stream_crcs = Vec::with_capacity(entries.len());
321        let mut sub_stream_sizes = Vec::with_capacity(entries.len());
322        for i in 0..entries.len() {
323            let entry = &mut entries[i];
324            let ri = &r[i];
325            entry.crc = ri.crc_value() as u64;
326            entry.size = ri.read_count() as u64;
327            sub_stream_crcs.push(entry.crc as u32);
328            sub_stream_sizes.push(entry.size);
329            entry.has_crc = true;
330        }
331
332        self.pack_info
333            .add_stream(compressed_len as u64, compressed_crc);
334
335        let mut sizes = Vec::with_capacity(more_sizes.len() + 1);
336        sizes.extend(more_sizes.iter().map(|s| s.get() as u64));
337        sizes.push(size as u64);
338
339        self.unpack_info.add_multiple(
340            content_methods.clone(),
341            sizes,
342            crc,
343            entries.len() as u64,
344            sub_stream_sizes,
345            sub_stream_crcs,
346        );
347
348        self.files.extend(entries);
349        Ok(self)
350    }
351
352    fn create_writer<'a, O: AsyncWrite + Unpin + 'a>(
353        methods: &[EncoderConfiguration],
354        out: O,
355        more_sized: &mut Vec<Rc<Cell<usize>>>,
356    ) -> Result<Box<dyn AsyncWrite + Unpin + 'a>> {
357        let mut encoder: Box<dyn AsyncWrite + Unpin> = Box::new(out);
358        let mut first = true;
359        for mc in methods.iter() {
360            if !first {
361                let counting = CountingWriter::new(encoder);
362                more_sized.push(counting.counting());
363                encoder = Box::new(encoder::add_encoder(counting, mc)?);
364            } else {
365                let counting = CountingWriter::new(encoder);
366                encoder = Box::new(encoder::add_encoder(counting, mc)?);
367            }
368            first = false;
369        }
370        Ok(encoder)
371    }
372
373    /// Finishes the compression.
374    pub async fn finish(mut self) -> std::io::Result<W> {
375        let mut cursor = Cursor::new(Vec::with_capacity(64 * 1024));
376        self.write_encoded_header(&mut cursor).await?;
377        let header = cursor.into_inner();
378        let header_pos = self.output.seek(SeekFrom::Current(0)).await?;
379        AsyncWriteExt::write_all(&mut self.output, &header).await?;
380        let crc32 = crc32fast::hash(&header);
381        let mut hh = [0u8; SIGNATURE_HEADER_SIZE as usize];
382        hh[0..SEVEN_Z_SIGNATURE.len()].copy_from_slice(SEVEN_Z_SIGNATURE);
383        hh[6] = 0;
384        hh[7] = 4;
385        hh[8..12].copy_from_slice(&0u32.to_le_bytes());
386        let start_header_offset_le = (header_pos - SIGNATURE_HEADER_SIZE).to_le_bytes();
387        hh[12..20].copy_from_slice(&start_header_offset_le);
388        let start_header_len_le = ((header.len() as u64) & 0xFFFF_FFFF).to_le_bytes();
389        hh[20..28].copy_from_slice(&start_header_len_le);
390        hh[28..32].copy_from_slice(&crc32.to_le_bytes());
391        let crc32 = crc32fast::hash(&hh[12..]);
392        hh[8..12].copy_from_slice(&crc32.to_le_bytes());
393
394        AsyncSeekExt::seek(&mut self.output, SeekFrom::Start(0)).await?;
395        AsyncWriteExt::write_all(&mut self.output, &hh).await?;
396        AsyncWriteExt::flush(&mut self.output).await?;
397        Ok(self.output)
398    }
399
400    async fn write_header<H: AsyncWrite + Unpin>(&mut self, header: &mut H) -> std::io::Result<()> {
401        AsyncWriteExt::write_all(header, &[K_HEADER]).await?;
402        AsyncWriteExt::write_all(header, &[K_MAIN_STREAMS_INFO]).await?;
403        self.write_streams_info(header).await?;
404        self.write_files_info(header).await?;
405        AsyncWriteExt::write_all(header, &[K_END]).await?;
406        Ok(())
407    }
408
409    async fn write_encoded_header<H: AsyncWrite + Unpin>(
410        &mut self,
411        header: &mut H,
412    ) -> std::io::Result<()> {
413        let mut raw_header_cursor = Cursor::new(Vec::with_capacity(64 * 1024));
414        self.write_header(&mut raw_header_cursor).await?;
415        let raw_header = raw_header_cursor.into_inner();
416        let mut pack_info = PackInfo::default();
417
418        let position = self.output.seek(SeekFrom::Current(0)).await?;
419        let pos = position - SIGNATURE_HEADER_SIZE;
420        pack_info.pos = pos;
421
422        let mut more_sizes = vec![];
423        let size = raw_header.len() as u64;
424        let crc32 = crc32fast::hash(&raw_header);
425        let mut methods = vec![];
426
427        if self.encrypt_header {
428            for conf in self.content_methods.iter() {
429                if conf.method.id() == EncoderMethod::AES256_SHA256.id() {
430                    methods.push(conf.clone());
431                    break;
432                }
433            }
434        }
435
436        methods.push(EncoderConfiguration::new(EncoderMethod::LZMA2));
437
438        let methods = Arc::new(methods);
439
440        let mut encoded_cursor = Cursor::new(Vec::with_capacity(size as usize / 2));
441
442        let mut compress_size = 0;
443        let mut compressed = CompressWrapWriter::new(&mut encoded_cursor, &mut compress_size);
444        {
445            let mut encoder = Self::create_writer(&methods, &mut compressed, &mut more_sizes)
446                .map_err(std::io::Error::other)?;
447            AsyncWriteExt::write_all(&mut encoder, &raw_header).await?;
448            AsyncWriteExt::flush(&mut encoder).await?;
449            let _ = AsyncWriteExt::write(&mut encoder, &[]).await?;
450        }
451
452        let compress_crc = compressed.crc_value();
453        let compress_size = *compressed.bytes_written;
454        if compress_size as u64 + 20 >= size {
455            AsyncWriteExt::write_all(header, &raw_header).await?;
456            return Ok(());
457        }
458        let encoded_data = encoded_cursor.into_inner();
459        AsyncWriteExt::write_all(&mut self.output, &encoded_data[..compress_size]).await?;
460
461        pack_info.add_stream(compress_size as u64, compress_crc);
462
463        let mut unpack_info = UnpackInfo::default();
464        let mut sizes = Vec::with_capacity(1 + more_sizes.len());
465        sizes.extend(more_sizes.iter().map(|s| s.get() as u64));
466        sizes.push(size);
467        unpack_info.add(methods, sizes, crc32);
468
469        AsyncWriteExt::write_all(header, &[K_ENCODED_HEADER]).await?;
470
471        pack_info.write_to(header).await?;
472        unpack_info.write_to(header).await?;
473        unpack_info.write_substreams(header).await?;
474
475        AsyncWriteExt::write_all(header, &[K_END]).await?;
476
477        Ok(())
478    }
479
480    async fn write_streams_info<H: AsyncWrite + Unpin>(
481        &mut self,
482        header: &mut H,
483    ) -> std::io::Result<()> {
484        if self.pack_info.len() > 0 {
485            self.pack_info.write_to(header).await?;
486            self.unpack_info.write_to(header).await?;
487        }
488        self.unpack_info.write_substreams(header).await?;
489
490        AsyncWriteExt::write_all(header, &[K_END]).await?;
491        Ok(())
492    }
493
494    async fn write_files_info<H: AsyncWrite + Unpin>(&self, header: &mut H) -> std::io::Result<()> {
495        AsyncWriteExt::write_all(header, &[K_FILES_INFO]).await?;
496        write_encoded_u64(header, self.files.len() as u64).await?;
497        self.write_file_empty_streams(header).await?;
498        self.write_file_empty_files(header).await?;
499        self.write_file_anti_items(header).await?;
500        self.write_file_names(header).await?;
501        self.write_file_ctimes(header).await?;
502        self.write_file_atimes(header).await?;
503        self.write_file_mtimes(header).await?;
504        self.write_file_windows_attrs(header).await?;
505        AsyncWriteExt::write_all(header, &[K_END]).await?;
506        Ok(())
507    }
508
509    async fn write_file_empty_streams<H: AsyncWrite + Unpin>(
510        &self,
511        header: &mut H,
512    ) -> std::io::Result<()> {
513        let mut has_empty = false;
514        for entry in self.files.iter() {
515            if !entry.has_stream {
516                has_empty = true;
517                break;
518            }
519        }
520        if has_empty {
521            AsyncWriteExt::write_all(header, &[K_EMPTY_STREAM]).await?;
522            let mut bitset = BitSet::with_capacity(self.files.len());
523            for (i, entry) in self.files.iter().enumerate() {
524                if !entry.has_stream {
525                    bitset.insert(i);
526                }
527            }
528            let temp = bitset_to_bytes(&bitset, self.files.len());
529            write_encoded_u64(header, temp.len() as u64).await?;
530            AsyncWriteExt::write_all(header, &temp).await?;
531        }
532        Ok(())
533    }
534
535    async fn write_file_empty_files<H: AsyncWrite + Unpin>(
536        &self,
537        header: &mut H,
538    ) -> std::io::Result<()> {
539        let mut has_empty = false;
540        let mut empty_stream_counter = 0;
541        let mut bitset = BitSet::new();
542        for entry in self.files.iter() {
543            if !entry.has_stream {
544                let is_dir = entry.is_directory();
545                has_empty |= !is_dir;
546                if !is_dir {
547                    bitset.insert(empty_stream_counter);
548                }
549                empty_stream_counter += 1;
550            }
551        }
552        if has_empty {
553            AsyncWriteExt::write_all(header, &[K_EMPTY_FILE]).await?;
554
555            let temp = bitset_to_bytes(&bitset, empty_stream_counter);
556            write_encoded_u64(header, temp.len() as u64).await?;
557            AsyncWriteExt::write_all(header, &temp).await?;
558        }
559        Ok(())
560    }
561
562    async fn write_file_anti_items<H: AsyncWrite + Unpin>(
563        &self,
564        header: &mut H,
565    ) -> std::io::Result<()> {
566        let mut has_anti = false;
567        let mut counter = 0;
568        let mut bitset = BitSet::new();
569        for entry in self.files.iter() {
570            if !entry.has_stream {
571                let is_anti = entry.is_anti_item();
572                has_anti |= !is_anti;
573                if !is_anti {
574                    bitset.insert(counter);
575                }
576                counter += 1;
577            }
578        }
579        if has_anti {
580            AsyncWriteExt::write_all(header, &[K_ANTI]).await?;
581
582            let temp = bitset_to_bytes(&bitset, counter);
583            write_encoded_u64(header, temp.len() as u64).await?;
584            AsyncWriteExt::write_all(header, &temp).await?;
585        }
586        Ok(())
587    }
588
589    async fn write_file_names<H: AsyncWrite + Unpin>(&self, header: &mut H) -> std::io::Result<()> {
590        AsyncWriteExt::write_all(header, &[K_NAME]).await?;
591        let mut temp: Vec<u8> = Vec::with_capacity(128);
592        temp.push(0);
593        for file in self.files.iter() {
594            for c in file.name().encode_utf16() {
595                temp.extend_from_slice(&c.to_le_bytes());
596            }
597            temp.extend_from_slice(&[0u8; 2]);
598        }
599        write_encoded_u64(header, temp.len() as u64).await?;
600        AsyncWriteExt::write_all(header, &temp).await?;
601        Ok(())
602    }
603
604    write_times!(
605        write_file_ctimes,
606        K_C_TIME,
607        has_creation_date,
608        creation_date
609    );
610    write_times!(write_file_atimes, K_A_TIME, has_access_date, access_date);
611    write_times!(
612        write_file_mtimes,
613        K_M_TIME,
614        has_last_modified_date,
615        last_modified_date
616    );
617    write_times!(
618        write_file_windows_attrs,
619        K_WIN_ATTRIBUTES,
620        has_windows_attributes,
621        windows_attributes,
622        write_u32
623    );
624}
625
626impl<W: AsyncWrite + AsyncSeek + Unpin> AutoFinish for ArchiveWriter<W> {
627    fn finish_ignore_error(self) {
628        let _ = async_io::block_on(self.finish());
629    }
630}
631
632pub(crate) async fn write_encoded_u64<W: AsyncWrite + Unpin>(
633    header: &mut W,
634    mut value: u64,
635) -> std::io::Result<()> {
636    let mut first = 0u64;
637    let mut mask = 0x80u64;
638    let mut i = 0u8;
639    while (i as usize) < 8 {
640        if value < (1u64 << (7 * (i as usize + 1))) {
641            first |= value >> (8 * i as usize);
642            break;
643        }
644        first |= mask;
645        mask >>= 1;
646        i += 1;
647    }
648    AsyncWriteExt::write_all(header, &[(first & 0xFF) as u8]).await?;
649    while i > 0 {
650        AsyncWriteExt::write_all(header, &[(value & 0xFF) as u8]).await?;
651        value >>= 8;
652        i -= 1;
653    }
654    Ok(())
655}
656
657fn vec_push_le_u64(buf: &mut Vec<u8>, value: u64) {
658    buf.extend_from_slice(&value.to_le_bytes());
659}
660
661fn vec_push_le_u32(buf: &mut Vec<u8>, value: u32) {
662    buf.extend_from_slice(&value.to_le_bytes());
663}
664
665pub(crate) fn bitset_to_bytes(bs: &BitSet, capacity: usize) -> Vec<u8> {
666    let mut out = Vec::with_capacity((capacity / 8).saturating_add(1));
667    let mut cache = 0u8;
668    let mut shift: i32 = 7;
669    for i in 0..capacity {
670        let set = if bs.contains(i) { 1 } else { 0 };
671        cache |= (set as u8) << shift;
672        shift -= 1;
673        if shift < 0 {
674            out.push(cache);
675            shift = 7;
676            cache = 0;
677        }
678    }
679    if shift != 7 {
680        out.push(cache);
681    }
682    out
683}
684
685struct CompressWrapWriter<'a, W> {
686    writer: W,
687    crc: Hasher,
688    cache: Vec<u8>,
689    bytes_written: &'a mut usize,
690}
691
692impl<'a, W> CompressWrapWriter<'a, W> {
693    pub fn new(writer: W, bytes_written: &'a mut usize) -> Self {
694        Self {
695            writer,
696            crc: Hasher::new(),
697            cache: Vec::with_capacity(8192),
698            bytes_written,
699        }
700    }
701
702    pub fn crc_value(&mut self) -> u32 {
703        let crc = std::mem::replace(&mut self.crc, Hasher::new());
704        crc.finalize()
705    }
706}
707
708/* removed sync Write impl to eliminate synchronous bridging */
709
710impl<W: AsyncWrite + Unpin> AsyncWrite for CompressWrapWriter<'_, W> {
711    fn poll_write(
712        mut self: std::pin::Pin<&mut Self>,
713        cx: &mut std::task::Context<'_>,
714        buf: &[u8],
715    ) -> std::task::Poll<std::io::Result<usize>> {
716        let this = &mut *self;
717        this.cache.resize(buf.len(), Default::default());
718        let poll = std::pin::Pin::new(&mut this.writer).poll_write(cx, buf);
719        if let std::task::Poll::Ready(Ok(len)) = &poll {
720            this.crc.update(&buf[..*len]);
721            *this.bytes_written += *len;
722        }
723        poll
724    }
725
726    fn poll_flush(
727        mut self: std::pin::Pin<&mut Self>,
728        cx: &mut std::task::Context<'_>,
729    ) -> std::task::Poll<std::io::Result<()>> {
730        std::pin::Pin::new(&mut self.writer).poll_flush(cx)
731    }
732
733    fn poll_close(
734        mut self: std::pin::Pin<&mut Self>,
735        cx: &mut std::task::Context<'_>,
736    ) -> std::task::Poll<std::io::Result<()>> {
737        std::pin::Pin::new(&mut self.writer).poll_close(cx)
738    }
739}