async_sevenz/
writer.rs

1mod counting_writer;
2#[cfg(not(target_arch = "wasm32"))]
3mod lazy_file_reader;
4mod pack_info;
5mod seq_reader;
6mod source_reader;
7mod unpack_info;
8
9use futures::io::{
10    AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, SeekFrom,
11};
12use std::{cell::Cell, rc::Rc, sync::Arc};
13
14pub(crate) use counting_writer::CountingWriter;
15use crc32fast::Hasher;
16
17#[cfg(not(target_arch = "wasm32"))]
18pub(crate) use self::lazy_file_reader::LazyFileReader;
19pub(crate) use self::seq_reader::SeqReader;
20pub use self::source_reader::SourceReader;
21use self::{pack_info::PackInfo, unpack_info::UnpackInfo};
22use crate::{ArchiveEntry, AutoFinish, AutoFinisher, Error, archive::*, bitset::BitSet, encoder};
23
24macro_rules! write_times {
25    ($fn_name:tt, $nid:expr, $has_time:tt, $time:tt) => {
26        async fn $fn_name<H: AsyncWrite + Unpin>(&self, header: &mut H) -> std::io::Result<()> {
27            let mut num = 0;
28            for entry in self.files.iter() {
29                if entry.$has_time {
30                    num += 1;
31                }
32            }
33            if num > 0 {
34                AsyncWriteExt::write_all(header, &[$nid]).await?;
35                let mut temp: Vec<u8> = Vec::with_capacity(128);
36                if num != self.files.len() {
37                    temp.push(0);
38                    let mut times = BitSet::with_capacity(self.files.len());
39                    for i in 0..self.files.len() {
40                        if self.files[i].$has_time {
41                            times.insert(i);
42                        }
43                    }
44                    let bits = bitset_to_bytes(&times, self.files.len());
45                    temp.extend_from_slice(&bits);
46                } else {
47                    temp.push(1);
48                }
49                temp.push(0);
50                for file in self.files.iter() {
51                    if file.$has_time {
52                        vec_push_le_u64(&mut temp, (file.$time).into());
53                    }
54                }
55                write_encoded_u64(header, temp.len() as u64).await?;
56                AsyncWriteExt::write_all(header, &temp).await?;
57            }
58            Ok(())
59        }
60    };
61    ($fn_name:tt, $nid:expr, $has_time:tt, $time:tt, write_u32) => {
62        async fn $fn_name<H: AsyncWrite + Unpin>(&self, header: &mut H) -> std::io::Result<()> {
63            let mut num = 0;
64            for entry in self.files.iter() {
65                if entry.$has_time {
66                    num += 1;
67                }
68            }
69            if num > 0 {
70                AsyncWriteExt::write_all(header, &[$nid]).await?;
71                let mut temp: Vec<u8> = Vec::with_capacity(128);
72                if num != self.files.len() {
73                    temp.push(0);
74                    let mut times = BitSet::with_capacity(self.files.len());
75                    for i in 0..self.files.len() {
76                        if self.files[i].$has_time {
77                            times.insert(i);
78                        }
79                    }
80                    let bits = bitset_to_bytes(&times, self.files.len());
81                    temp.extend_from_slice(&bits);
82                } else {
83                    temp.push(1);
84                }
85                temp.push(0);
86                for file in self.files.iter() {
87                    if file.$has_time {
88                        vec_push_le_u32(&mut temp, file.$time);
89                    }
90                }
91                write_encoded_u64(header, temp.len() as u64).await?;
92                AsyncWriteExt::write_all(header, &temp).await?;
93            }
94            Ok(())
95        }
96    };
97}
98
99type Result<T> = std::result::Result<T, Error>;
100
101/// Writes a 7z archive file.
102pub struct ArchiveWriter<W: AsyncWrite + AsyncSeek + Unpin> {
103    output: W,
104    files: Vec<ArchiveEntry>,
105    content_methods: Arc<Vec<EncoderConfiguration>>,
106    pack_info: PackInfo,
107    unpack_info: UnpackInfo,
108    encrypt_header: bool,
109}
110
111#[cfg(not(target_arch = "wasm32"))]
112impl ArchiveWriter<futures::io::Cursor<Vec<u8>>> {
113    /// 创建一个基于内存缓冲的 7z 写入器。
114    ///
115    /// 返回使用 `Vec<u8>` 作为底层存储的 `ArchiveWriter`,适合测试或无需落盘的场景。
116    pub async fn create_in_memory() -> Result<Self> {
117        let cursor = futures::io::Cursor::new(Vec::<u8>::new());
118        Self::new(cursor).await
119    }
120}
121
122impl<W: AsyncWrite + AsyncSeek + Unpin> ArchiveWriter<W> {
123    /// Prepares writer to write a 7z archive to.
124    pub async fn new(mut writer: W) -> Result<Self> {
125        AsyncSeekExt::seek(&mut writer, SeekFrom::Start(SIGNATURE_HEADER_SIZE)).await?;
126
127        Ok(Self {
128            output: writer,
129            files: Default::default(),
130            content_methods: Arc::new(vec![EncoderConfiguration::new(EncoderMethod::LZMA2)]),
131            pack_info: Default::default(),
132            unpack_info: Default::default(),
133            encrypt_header: true,
134        })
135    }
136
137    /// Returns a wrapper around `self` that will finish the stream on drop.
138    pub fn auto_finish(self) -> AutoFinisher<Self> {
139        AutoFinisher(Some(self))
140    }
141
142    /// Sets the default compression methods to use for entry data. Default is LZMA2.
143    pub fn set_content_methods(&mut self, content_methods: Vec<EncoderConfiguration>) -> &mut Self {
144        if content_methods.is_empty() {
145            return self;
146        }
147        self.content_methods = Arc::new(content_methods);
148        self
149    }
150
151    /// Whether to enable the encryption of the -header. Default is `true`.
152    pub fn set_encrypt_header(&mut self, enabled: bool) {
153        self.encrypt_header = enabled;
154    }
155
156    /// Non-solid compression - Adds an archive `entry` with data from `reader`.
157    ///
158    /// # Example
159    /// ```no_run
160    /// use std::io::Cursor;
161    /// use std::path::Path;
162    /// use async_sevenz::*;
163    /// let mut sz = smol::block_on(ArchiveWriter::create_in_memory()).expect("create writer ok");
164    /// let src = Path::new("path/to/source.txt");
165    /// let name = "source.txt".to_string();
166    /// let entry = smol::block_on(async {
167    ///     sz
168    ///         .push_archive_entry(
169    ///             ArchiveEntry::from_path(&src, name).await,
170    ///             Some(futures::io::Cursor::new(&b"example"[..])),
171    ///         )
172    ///         .await
173    ///         .expect("ok")
174    /// });
175    /// let compressed_size = entry.compressed_size;
176    /// let _cursor = smol::block_on(sz.finish()).expect("done");
177    /// ```
178    pub async fn push_archive_entry<R: AsyncRead + Unpin>(
179        &mut self,
180        mut entry: ArchiveEntry,
181        reader: Option<R>,
182    ) -> Result<&ArchiveEntry> {
183        if !entry.is_directory {
184            if let Some(mut r) = reader {
185                let mut compressed_len = 0;
186                let mut compressed = CompressWrapWriter::new(&mut self.output, &mut compressed_len);
187
188                let mut more_sizes: Vec<Rc<Cell<usize>>> =
189                    Vec::with_capacity(self.content_methods.len() - 1);
190
191                let (crc, size) = {
192                    let mut w = Self::create_writer(
193                        &self.content_methods,
194                        &mut compressed,
195                        &mut more_sizes,
196                    )?;
197                    let mut write_len = 0;
198                    let mut w = CompressWrapWriter::new(&mut w, &mut write_len);
199                    let mut buf = [0u8; 4096];
200                    loop {
201                        let n = AsyncReadExt::read(&mut r, &mut buf).await.map_err(|e| {
202                            Error::io_msg(e, format!("Encode entry:{}", entry.name()))
203                        })?;
204                        if n == 0 {
205                            break;
206                        }
207                        AsyncWriteExt::write_all(&mut w, &buf[..n])
208                            .await
209                            .map_err(|e| {
210                                Error::io_msg(e, format!("Encode entry:{}", entry.name()))
211                            })?;
212                    }
213                    AsyncWriteExt::flush(&mut w)
214                        .await
215                        .map_err(|e| Error::io_msg(e, format!("Encode entry:{}", entry.name())))?;
216                    AsyncWriteExt::write(&mut w, &[])
217                        .await
218                        .map_err(|e| Error::io_msg(e, format!("Encode entry:{}", entry.name())))?;
219
220                    (w.crc_value(), write_len)
221                };
222                let compressed_crc = compressed.crc_value();
223                entry.has_stream = true;
224                entry.size = size as u64;
225                entry.crc = crc as u64;
226                entry.has_crc = true;
227                entry.compressed_crc = compressed_crc as u64;
228                entry.compressed_size = compressed_len as u64;
229                self.pack_info
230                    .add_stream(compressed_len as u64, compressed_crc);
231
232                let mut sizes = Vec::with_capacity(more_sizes.len() + 1);
233                sizes.extend(more_sizes.iter().map(|s| s.get() as u64));
234                sizes.push(size as u64);
235
236                self.unpack_info
237                    .add(self.content_methods.clone(), sizes, crc);
238
239                self.files.push(entry);
240                return Ok(self.files.last().unwrap());
241            }
242        }
243        entry.has_stream = false;
244        entry.size = 0;
245        entry.compressed_size = 0;
246        entry.has_crc = false;
247        self.files.push(entry);
248        Ok(self.files.last().unwrap())
249    }
250
251    /// Solid compression - packs `entries` into one pack.
252    ///
253    /// # Panics
254    /// * If `entries`'s length not equals to `reader.reader_len()`
255    pub async fn push_archive_entries<R: AsyncRead + Unpin>(
256        &mut self,
257        entries: Vec<ArchiveEntry>,
258        reader: Vec<SourceReader<R>>,
259    ) -> Result<&mut Self> {
260        let mut entries = entries;
261        let mut r = SeqReader::new(reader);
262        assert_eq!(r.reader_len(), entries.len());
263        let mut compressed_len = 0;
264        let mut compressed = CompressWrapWriter::new(&mut self.output, &mut compressed_len);
265        let content_methods = &self.content_methods;
266        let mut more_sizes: Vec<Rc<Cell<usize>>> = Vec::with_capacity(content_methods.len() - 1);
267
268        let (crc, size) = {
269            let mut w = Self::create_writer(content_methods, &mut compressed, &mut more_sizes)?;
270            let mut write_len = 0;
271            let mut w = CompressWrapWriter::new(&mut w, &mut write_len);
272            let mut buf = [0u8; 4096];
273
274            fn entries_names(entries: &[ArchiveEntry]) -> String {
275                let mut names = String::with_capacity(512);
276                for ele in entries.iter() {
277                    names.push_str(&ele.name);
278                    names.push(';');
279                    if names.len() > 512 {
280                        break;
281                    }
282                }
283                names
284            }
285
286            loop {
287                let n = AsyncReadExt::read(&mut r, &mut buf).await.map_err(|e| {
288                    Error::io_msg(e, format!("Encode entries:{}", entries_names(&entries)))
289                })?;
290                if n == 0 {
291                    break;
292                }
293                AsyncWriteExt::write_all(&mut w, &buf[..n])
294                    .await
295                    .map_err(|e| {
296                        Error::io_msg(e, format!("Encode entries:{}", entries_names(&entries)))
297                    })?;
298            }
299            AsyncWriteExt::flush(&mut w).await.map_err(|e| {
300                let mut names = String::with_capacity(512);
301                for ele in entries.iter() {
302                    names.push_str(&ele.name);
303                    names.push(';');
304                    if names.len() > 512 {
305                        break;
306                    }
307                }
308                Error::io_msg(e, format!("Encode entry:{names}"))
309            })?;
310            AsyncWriteExt::write(&mut w, &[]).await.map_err(|e| {
311                Error::io_msg(e, format!("Encode entry:{}", entries_names(&entries)))
312            })?;
313
314            (w.crc_value(), write_len)
315        };
316        let compressed_crc = compressed.crc_value();
317        let mut sub_stream_crcs = Vec::with_capacity(entries.len());
318        let mut sub_stream_sizes = Vec::with_capacity(entries.len());
319        for i in 0..entries.len() {
320            let entry = &mut entries[i];
321            let ri = &r[i];
322            entry.crc = ri.crc_value() as u64;
323            entry.size = ri.read_count() as u64;
324            sub_stream_crcs.push(entry.crc as u32);
325            sub_stream_sizes.push(entry.size);
326            entry.has_crc = true;
327        }
328
329        self.pack_info
330            .add_stream(compressed_len as u64, compressed_crc);
331
332        let mut sizes = Vec::with_capacity(more_sizes.len() + 1);
333        sizes.extend(more_sizes.iter().map(|s| s.get() as u64));
334        sizes.push(size as u64);
335
336        self.unpack_info.add_multiple(
337            content_methods.clone(),
338            sizes,
339            crc,
340            entries.len() as u64,
341            sub_stream_sizes,
342            sub_stream_crcs,
343        );
344
345        self.files.extend(entries);
346        Ok(self)
347    }
348
349    fn create_writer<'a, O: AsyncWrite + Unpin + 'a>(
350        methods: &[EncoderConfiguration],
351        out: O,
352        more_sized: &mut Vec<Rc<Cell<usize>>>,
353    ) -> Result<Box<dyn AsyncWrite + Unpin + 'a>> {
354        let mut encoder: Box<dyn AsyncWrite + Unpin> = Box::new(out);
355        let mut first = true;
356        for mc in methods.iter() {
357            if !first {
358                let counting = CountingWriter::new(encoder);
359                more_sized.push(counting.counting());
360                encoder = Box::new(encoder::add_encoder(counting, mc)?);
361            } else {
362                let counting = CountingWriter::new(encoder);
363                encoder = Box::new(encoder::add_encoder(counting, mc)?);
364            }
365            first = false;
366        }
367        Ok(encoder)
368    }
369
370    /// Finishes the compression.
371    pub async fn finish(mut self) -> std::io::Result<W> {
372        let mut cursor = futures::io::Cursor::new(Vec::with_capacity(64 * 1024));
373        self.write_encoded_header(&mut cursor).await?;
374        let header = cursor.into_inner();
375        let header_pos = AsyncSeekExt::stream_position(&mut self.output).await?;
376        AsyncWriteExt::write_all(&mut self.output, &header).await?;
377        let crc32 = crc32fast::hash(&header);
378        let mut hh = [0u8; SIGNATURE_HEADER_SIZE as usize];
379        hh[0..SEVEN_Z_SIGNATURE.len()].copy_from_slice(SEVEN_Z_SIGNATURE);
380        hh[6] = 0;
381        hh[7] = 4;
382        hh[8..12].copy_from_slice(&0u32.to_le_bytes());
383        let start_header_offset_le = (header_pos - SIGNATURE_HEADER_SIZE).to_le_bytes();
384        hh[12..20].copy_from_slice(&start_header_offset_le);
385        let start_header_len_le = ((header.len() as u64) & 0xFFFF_FFFF).to_le_bytes();
386        hh[20..28].copy_from_slice(&start_header_len_le);
387        hh[28..32].copy_from_slice(&crc32.to_le_bytes());
388        let crc32 = crc32fast::hash(&hh[12..]);
389        hh[8..12].copy_from_slice(&crc32.to_le_bytes());
390
391        AsyncSeekExt::seek(&mut self.output, SeekFrom::Start(0)).await?;
392        AsyncWriteExt::write_all(&mut self.output, &hh).await?;
393        AsyncWriteExt::flush(&mut self.output).await?;
394        Ok(self.output)
395    }
396
397    async fn write_header<H: AsyncWrite + Unpin>(&mut self, header: &mut H) -> std::io::Result<()> {
398        AsyncWriteExt::write_all(header, &[K_HEADER]).await?;
399        AsyncWriteExt::write_all(header, &[K_MAIN_STREAMS_INFO]).await?;
400        self.write_streams_info(header).await?;
401        self.write_files_info(header).await?;
402        AsyncWriteExt::write_all(header, &[K_END]).await?;
403        Ok(())
404    }
405
406    async fn write_encoded_header<H: AsyncWrite + Unpin>(
407        &mut self,
408        header: &mut H,
409    ) -> std::io::Result<()> {
410        let mut raw_header_cursor = futures::io::Cursor::new(Vec::with_capacity(64 * 1024));
411        self.write_header(&mut raw_header_cursor).await?;
412        let raw_header = raw_header_cursor.into_inner();
413        let mut pack_info = PackInfo::default();
414
415        let position = AsyncSeekExt::stream_position(&mut self.output).await?;
416        let pos = position - SIGNATURE_HEADER_SIZE;
417        pack_info.pos = pos;
418
419        let mut more_sizes = vec![];
420        let size = raw_header.len() as u64;
421        let crc32 = crc32fast::hash(&raw_header);
422        let mut methods = vec![];
423
424        if self.encrypt_header {
425            for conf in self.content_methods.iter() {
426                if conf.method.id() == EncoderMethod::AES256_SHA256.id() {
427                    methods.push(conf.clone());
428                    break;
429                }
430            }
431        }
432
433        methods.push(EncoderConfiguration::new(EncoderMethod::LZMA2));
434
435        let methods = Arc::new(methods);
436
437        let mut encoded_cursor = futures::io::Cursor::new(Vec::with_capacity(size as usize / 2));
438
439        let mut compress_size = 0;
440        let mut compressed = CompressWrapWriter::new(&mut encoded_cursor, &mut compress_size);
441        {
442            let mut encoder = Self::create_writer(&methods, &mut compressed, &mut more_sizes)
443                .map_err(std::io::Error::other)?;
444            AsyncWriteExt::write_all(&mut encoder, &raw_header).await?;
445            AsyncWriteExt::flush(&mut encoder).await?;
446            let _ = AsyncWriteExt::write(&mut encoder, &[]).await?;
447        }
448
449        let compress_crc = compressed.crc_value();
450        let compress_size = *compressed.bytes_written;
451        if compress_size as u64 + 20 >= size {
452            AsyncWriteExt::write_all(header, &raw_header).await?;
453            return Ok(());
454        }
455        let encoded_data = encoded_cursor.into_inner();
456        AsyncWriteExt::write_all(&mut self.output, &encoded_data[..compress_size]).await?;
457
458        pack_info.add_stream(compress_size as u64, compress_crc);
459
460        let mut unpack_info = UnpackInfo::default();
461        let mut sizes = Vec::with_capacity(1 + more_sizes.len());
462        sizes.extend(more_sizes.iter().map(|s| s.get() as u64));
463        sizes.push(size);
464        unpack_info.add(methods, sizes, crc32);
465
466        AsyncWriteExt::write_all(header, &[K_ENCODED_HEADER]).await?;
467
468        pack_info.write_to(header).await?;
469        unpack_info.write_to(header).await?;
470        unpack_info.write_substreams(header).await?;
471
472        AsyncWriteExt::write_all(header, &[K_END]).await?;
473
474        Ok(())
475    }
476
477    async fn write_streams_info<H: AsyncWrite + Unpin>(
478        &mut self,
479        header: &mut H,
480    ) -> std::io::Result<()> {
481        if self.pack_info.len() > 0 {
482            self.pack_info.write_to(header).await?;
483            self.unpack_info.write_to(header).await?;
484        }
485        self.unpack_info.write_substreams(header).await?;
486
487        AsyncWriteExt::write_all(header, &[K_END]).await?;
488        Ok(())
489    }
490
491    async fn write_files_info<H: AsyncWrite + Unpin>(&self, header: &mut H) -> std::io::Result<()> {
492        AsyncWriteExt::write_all(header, &[K_FILES_INFO]).await?;
493        write_encoded_u64(header, self.files.len() as u64).await?;
494        self.write_file_empty_streams(header).await?;
495        self.write_file_empty_files(header).await?;
496        self.write_file_anti_items(header).await?;
497        self.write_file_names(header).await?;
498        self.write_file_ctimes(header).await?;
499        self.write_file_atimes(header).await?;
500        self.write_file_mtimes(header).await?;
501        self.write_file_windows_attrs(header).await?;
502        AsyncWriteExt::write_all(header, &[K_END]).await?;
503        Ok(())
504    }
505
506    async fn write_file_empty_streams<H: AsyncWrite + Unpin>(
507        &self,
508        header: &mut H,
509    ) -> std::io::Result<()> {
510        let mut has_empty = false;
511        for entry in self.files.iter() {
512            if !entry.has_stream {
513                has_empty = true;
514                break;
515            }
516        }
517        if has_empty {
518            AsyncWriteExt::write_all(header, &[K_EMPTY_STREAM]).await?;
519            let mut bitset = BitSet::with_capacity(self.files.len());
520            for (i, entry) in self.files.iter().enumerate() {
521                if !entry.has_stream {
522                    bitset.insert(i);
523                }
524            }
525            let temp = bitset_to_bytes(&bitset, self.files.len());
526            write_encoded_u64(header, temp.len() as u64).await?;
527            AsyncWriteExt::write_all(header, &temp).await?;
528        }
529        Ok(())
530    }
531
532    async fn write_file_empty_files<H: AsyncWrite + Unpin>(
533        &self,
534        header: &mut H,
535    ) -> std::io::Result<()> {
536        let mut has_empty = false;
537        let mut empty_stream_counter = 0;
538        let mut bitset = BitSet::new();
539        for entry in self.files.iter() {
540            if !entry.has_stream {
541                let is_dir = entry.is_directory();
542                has_empty |= !is_dir;
543                if !is_dir {
544                    bitset.insert(empty_stream_counter);
545                }
546                empty_stream_counter += 1;
547            }
548        }
549        if has_empty {
550            AsyncWriteExt::write_all(header, &[K_EMPTY_FILE]).await?;
551
552            let temp = bitset_to_bytes(&bitset, empty_stream_counter);
553            write_encoded_u64(header, temp.len() as u64).await?;
554            AsyncWriteExt::write_all(header, &temp).await?;
555        }
556        Ok(())
557    }
558
559    async fn write_file_anti_items<H: AsyncWrite + Unpin>(
560        &self,
561        header: &mut H,
562    ) -> std::io::Result<()> {
563        let mut has_anti = false;
564        let mut counter = 0;
565        let mut bitset = BitSet::new();
566        for entry in self.files.iter() {
567            if !entry.has_stream {
568                let is_anti = entry.is_anti_item();
569                has_anti |= !is_anti;
570                if !is_anti {
571                    bitset.insert(counter);
572                }
573                counter += 1;
574            }
575        }
576        if has_anti {
577            AsyncWriteExt::write_all(header, &[K_ANTI]).await?;
578
579            let temp = bitset_to_bytes(&bitset, counter);
580            write_encoded_u64(header, temp.len() as u64).await?;
581            AsyncWriteExt::write_all(header, &temp).await?;
582        }
583        Ok(())
584    }
585
586    async fn write_file_names<H: AsyncWrite + Unpin>(&self, header: &mut H) -> std::io::Result<()> {
587        AsyncWriteExt::write_all(header, &[K_NAME]).await?;
588        let mut temp: Vec<u8> = Vec::with_capacity(128);
589        temp.push(0);
590        for file in self.files.iter() {
591            for c in file.name().encode_utf16() {
592                temp.extend_from_slice(&c.to_le_bytes());
593            }
594            temp.extend_from_slice(&[0u8; 2]);
595        }
596        write_encoded_u64(header, temp.len() as u64).await?;
597        AsyncWriteExt::write_all(header, &temp).await?;
598        Ok(())
599    }
600
601    write_times!(
602        write_file_ctimes,
603        K_C_TIME,
604        has_creation_date,
605        creation_date
606    );
607    write_times!(write_file_atimes, K_A_TIME, has_access_date, access_date);
608    write_times!(
609        write_file_mtimes,
610        K_M_TIME,
611        has_last_modified_date,
612        last_modified_date
613    );
614    write_times!(
615        write_file_windows_attrs,
616        K_WIN_ATTRIBUTES,
617        has_windows_attributes,
618        windows_attributes,
619        write_u32
620    );
621}
622
623impl<W: AsyncWrite + AsyncSeek + Unpin> AutoFinish for ArchiveWriter<W> {
624    fn finish_ignore_error(self) {
625        let _ = async_io::block_on(self.finish());
626    }
627}
628
629pub(crate) async fn write_encoded_u64<W: AsyncWrite + Unpin>(
630    header: &mut W,
631    mut value: u64,
632) -> std::io::Result<()> {
633    let mut first = 0u64;
634    let mut mask = 0x80u64;
635    let mut i = 0u8;
636    while (i as usize) < 8 {
637        if value < (1u64 << (7 * (i as usize + 1))) {
638            first |= value >> (8 * i as usize);
639            break;
640        }
641        first |= mask;
642        mask >>= 1;
643        i += 1;
644    }
645    AsyncWriteExt::write_all(header, &[(first & 0xFF) as u8]).await?;
646    while i > 0 {
647        AsyncWriteExt::write_all(header, &[(value & 0xFF) as u8]).await?;
648        value >>= 8;
649        i -= 1;
650    }
651    Ok(())
652}
653
654fn vec_push_le_u64(buf: &mut Vec<u8>, value: u64) {
655    buf.extend_from_slice(&value.to_le_bytes());
656}
657
658fn vec_push_le_u32(buf: &mut Vec<u8>, value: u32) {
659    buf.extend_from_slice(&value.to_le_bytes());
660}
661
662pub(crate) fn bitset_to_bytes(bs: &BitSet, capacity: usize) -> Vec<u8> {
663    let mut out = Vec::with_capacity((capacity / 8).saturating_add(1));
664    let mut cache = 0u8;
665    let mut shift: i32 = 7;
666    for i in 0..capacity {
667        let set = if bs.contains(i) { 1 } else { 0 };
668        cache |= (set as u8) << shift;
669        shift -= 1;
670        if shift < 0 {
671            out.push(cache);
672            shift = 7;
673            cache = 0;
674        }
675    }
676    if shift != 7 {
677        out.push(cache);
678    }
679    out
680}
681
682struct CompressWrapWriter<'a, W> {
683    writer: W,
684    crc: Hasher,
685    cache: Vec<u8>,
686    bytes_written: &'a mut usize,
687}
688
689impl<'a, W> CompressWrapWriter<'a, W> {
690    pub fn new(writer: W, bytes_written: &'a mut usize) -> Self {
691        Self {
692            writer,
693            crc: Hasher::new(),
694            cache: Vec::with_capacity(8192),
695            bytes_written,
696        }
697    }
698
699    pub fn crc_value(&mut self) -> u32 {
700        let crc = std::mem::replace(&mut self.crc, Hasher::new());
701        crc.finalize()
702    }
703}
704
705/* removed sync Write impl to eliminate synchronous bridging */
706
707impl<W: AsyncWrite + Unpin> AsyncWrite for CompressWrapWriter<'_, W> {
708    fn poll_write(
709        mut self: std::pin::Pin<&mut Self>,
710        cx: &mut std::task::Context<'_>,
711        buf: &[u8],
712    ) -> std::task::Poll<std::io::Result<usize>> {
713        let this = &mut *self;
714        this.cache.resize(buf.len(), Default::default());
715        let poll = std::pin::Pin::new(&mut this.writer).poll_write(cx, buf);
716        if let std::task::Poll::Ready(Ok(len)) = &poll {
717            this.crc.update(&buf[..*len]);
718            *this.bytes_written += *len;
719        }
720        poll
721    }
722
723    fn poll_flush(
724        mut self: std::pin::Pin<&mut Self>,
725        cx: &mut std::task::Context<'_>,
726    ) -> std::task::Poll<std::io::Result<()>> {
727        std::pin::Pin::new(&mut self.writer).poll_flush(cx)
728    }
729
730    fn poll_close(
731        mut self: std::pin::Pin<&mut Self>,
732        cx: &mut std::task::Context<'_>,
733    ) -> std::task::Poll<std::io::Result<()>> {
734        std::pin::Pin::new(&mut self.writer).poll_close(cx)
735    }
736}