async_sevenz/
reader.rs

1#[cfg(not(target_arch = "wasm32"))]
2use std::path::Path;
3use std::{
4    collections::HashMap,
5    future::Future,
6    io,
7    num::NonZeroUsize,
8    pin::Pin,
9    sync::{Arc, Mutex},
10    task::{Context, Poll},
11};
12
13use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Cursor, SeekFrom};
14
15use async_fs as afs;
16use crc32fast::Hasher;
17use lzma_rust2::filter::bcj2::Bcj2Reader;
18
19use crate::{
20    Password,
21    archive::*,
22    bitset::BitSet,
23    block::*,
24    decoder::{AsyncStdRead, add_decoder},
25    error::Error,
26};
27
28const MAX_MEM_LIMIT_KB: usize = usize::MAX / 1024;
29
30pub struct BoundedReader<R: AsyncRead + Unpin> {
31    inner: R,
32    remain: usize,
33}
34
35impl<R: AsyncRead + Unpin> BoundedReader<R> {
36    pub fn new(inner: R, max_size: usize) -> Self {
37        Self {
38            inner,
39            remain: max_size,
40        }
41    }
42}
43
44impl<R: AsyncRead + Unpin> AsyncRead for BoundedReader<R> {
45    fn poll_read(
46        mut self: Pin<&mut Self>,
47        cx: &mut Context<'_>,
48        buf: &mut [u8],
49    ) -> Poll<std::io::Result<usize>> {
50        if self.remain == 0 {
51            return Poll::Ready(Ok(0));
52        }
53        let bound = buf.len().min(self.remain);
54        let poll = Pin::new(&mut self.inner).poll_read(cx, &mut buf[..bound]);
55        if let Poll::Ready(Ok(size)) = &poll {
56            self.remain -= *size;
57        }
58        poll
59    }
60}
61
62/// A special reader that shares it's inner reader with other instances and
63/// needs to re-seek every read operation.
64#[derive(Debug)]
65pub(crate) struct SharedBoundedReader<'a, R> {
66    inner: Arc<Mutex<&'a mut R>>,
67    cur: u64,
68    bounds: (u64, u64),
69}
70
71impl<'a, R> Clone for SharedBoundedReader<'a, R> {
72    fn clone(&self) -> Self {
73        Self {
74            inner: Arc::clone(&self.inner),
75            cur: self.cur,
76            bounds: self.bounds,
77        }
78    }
79}
80
81impl<'a, R: AsyncRead + AsyncSeek + Unpin> AsyncRead for SharedBoundedReader<'a, R> {
82    fn poll_read(
83        mut self: Pin<&mut Self>,
84        cx: &mut Context<'_>,
85        buf: &mut [u8],
86    ) -> Poll<std::io::Result<usize>> {
87        if self.cur >= self.bounds.1 {
88            return Poll::Ready(Ok(0));
89        }
90        let cur = self.cur;
91        let mut inner = self.inner.lock().unwrap();
92        match Pin::new(&mut *inner).poll_seek(cx, SeekFrom::Start(cur)) {
93            Poll::Pending => return Poll::Pending,
94            Poll::Ready(Ok(_)) => {}
95            Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
96        }
97        let bound = buf.len().min((self.bounds.1 - cur) as usize);
98        let poll = Pin::new(&mut *inner).poll_read(cx, &mut buf[..bound]);
99        drop(inner);
100        match poll {
101            Poll::Pending => Poll::Pending,
102            Poll::Ready(Ok(size)) => {
103                self.cur += size as u64;
104                Poll::Ready(Ok(size))
105            }
106            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
107        }
108    }
109}
110
111impl<'a, R: AsyncRead + AsyncSeek + Unpin> SharedBoundedReader<'a, R> {
112    fn new(inner: Arc<Mutex<&'a mut R>>, bounds: (u64, u64)) -> Self {
113        Self {
114            inner,
115            cur: bounds.0,
116            bounds,
117        }
118    }
119}
120
121struct Crc32VerifyingReader<R> {
122    inner: R,
123    crc_digest: Hasher,
124    expected_value: u64,
125    remaining: i64,
126}
127
128impl<R> Crc32VerifyingReader<R> {
129    fn new(inner: R, remaining: usize, expected_value: u64) -> Self {
130        Self {
131            inner,
132            crc_digest: Hasher::new(),
133            expected_value,
134            remaining: remaining as i64,
135        }
136    }
137}
138
139// synchronous Read impl removed to prefer async pipeline
140
141impl<R: AsyncRead + Unpin> AsyncRead for Crc32VerifyingReader<R> {
142    fn poll_read(
143        mut self: Pin<&mut Self>,
144        cx: &mut Context<'_>,
145        buf: &mut [u8],
146    ) -> Poll<std::io::Result<usize>> {
147        if self.remaining <= 0 {
148            return Poll::Ready(Ok(0));
149        }
150        let poll = Pin::new(&mut self.inner).poll_read(cx, buf);
151        if let Poll::Ready(Ok(size)) = poll {
152            if size > 0 {
153                self.remaining -= size as i64;
154                self.crc_digest.update(&buf[..size]);
155            }
156            if self.remaining <= 0 {
157                let d = std::mem::replace(&mut self.crc_digest, Hasher::new()).finalize();
158                if d as u64 != self.expected_value {
159                    return Poll::Ready(Err(std::io::Error::other(
160                        Error::ChecksumVerificationFailed,
161                    )));
162                }
163            }
164        }
165        poll
166    }
167}
168
169impl Archive {
170    /// Opens a 7z archive asynchronously from a filesystem path using an empty password.
171    ///
172    /// Returns the parsed `Archive` metadata without decoding file contents.
173    #[cfg(not(target_arch = "wasm32"))]
174    pub async fn open(path: impl AsRef<Path>) -> Result<Archive, Error> {
175        use futures_lite::io::Cursor;
176
177        let data = afs::read(path.as_ref())
178            .await
179            .map_err(|e| Error::file_open(e, path.as_ref().to_string_lossy().to_string()))?;
180        let mut cursor = Cursor::new(data);
181        Self::read(&mut cursor, &Password::empty()).await
182    }
183
184    /// Opens a 7z archive asynchronously from a filesystem path using the given password.
185    ///
186    /// Returns the parsed `Archive` metadata without decoding file contents.
187    #[cfg(not(target_arch = "wasm32"))]
188    pub async fn open_with_password(
189        path: impl AsRef<Path>,
190        password: &Password,
191    ) -> Result<Archive, Error> {
192        let data = afs::read(path.as_ref())
193            .await
194            .map_err(|e| Error::file_open(e, path.as_ref().to_string_lossy().to_string()))?;
195        let mut cursor = Cursor::new(data);
196        Self::read(&mut cursor, password).await
197    }
198
199    /// Read 7z file archive info use the specified `reader`.
200    ///
201    /// # Parameters
202    /// - `reader`   - the reader of the 7z filr archive
203    /// - `password` - archive password encoded in utf16 little endian
204    ///
205    /// # Example
206    ///
207    /// ```no_run
208    /// use async_sevenz::*;
209    ///
210    /// #[tokio::main]
211    /// async fn main() {
212    ///     let password = Password::from("the password");
213    ///     let archive = Archive::open_with_password("example.7z", &password).await.unwrap();
214    ///
215    ///     for entry in &archive.files {
216    ///         println!("{}", entry.name());
217    ///     }
218    /// }
219    /// ```
220    pub(crate) async fn read<R: AsyncRead + AsyncSeek + Unpin + Send>(
221        reader: &mut R,
222        password: &Password,
223    ) -> Result<Archive, Error> {
224        let reader_len = AsyncSeekExt::seek(reader, SeekFrom::End(0)).await?;
225        AsyncSeekExt::seek(reader, SeekFrom::Start(0)).await?;
226
227        let mut signature = [0; 6];
228        AsyncReadExt::read_exact(reader, &mut signature).await?;
229        if signature != SEVEN_Z_SIGNATURE {
230            return Err(Error::BadSignature(signature));
231        }
232        let mut versions = [0; 2];
233        AsyncReadExt::read_exact(reader, &mut versions).await?;
234        let version_major = versions[0];
235        let version_minor = versions[1];
236        if version_major != 0 {
237            return Err(Error::UnsupportedVersion {
238                major: version_major,
239                minor: version_minor,
240            });
241        }
242
243        let start_header_crc = {
244            let mut buf = [0u8; 4];
245            AsyncReadExt::read_exact(reader, &mut buf).await?;
246            u32::from_le_bytes(buf)
247        };
248
249        let header_valid = if start_header_crc == 0 {
250            let current_position = reader.seek(SeekFrom::Current(0)).await?;
251            let mut buf = [0; 20];
252            AsyncReadExt::read_exact(reader, &mut buf).await?;
253            AsyncSeekExt::seek(reader, SeekFrom::Start(current_position)).await?;
254            buf.iter().any(|a| *a != 0)
255        } else {
256            true
257        };
258        if header_valid {
259            let start_header = Self::read_start_header(reader, start_header_crc).await?;
260            Self::init_archive(reader, start_header, password, true, 1).await
261        } else {
262            Self::try_to_locale_end_header(reader, reader_len, password, 1).await
263        }
264    }
265
266    async fn read_start_header<R: AsyncRead + Unpin>(
267        reader: &mut R,
268        start_header_crc: u32,
269    ) -> Result<StartHeader, Error> {
270        let mut buf = [0; 20];
271        AsyncReadExt::read_exact(reader, &mut buf).await?;
272        let crc32 = crc32fast::hash(&buf);
273        if crc32 != start_header_crc {
274            return Err(Error::ChecksumVerificationFailed);
275        }
276        let offset = u64::from_le_bytes(buf[0..8].try_into().unwrap());
277        let size = u64::from_le_bytes(buf[8..16].try_into().unwrap());
278        let crc = u32::from_le_bytes(buf[16..20].try_into().unwrap());
279        Ok(StartHeader {
280            next_header_offset: offset,
281            next_header_size: size,
282            next_header_crc: crc as u64,
283        })
284    }
285
286    async fn read_header<R: AsyncRead + AsyncSeek + Unpin>(
287        header: &mut R,
288        archive: &mut Archive,
289    ) -> Result<(), Error> {
290        let mut nid = {
291            let mut b = [0u8; 1];
292            AsyncReadExt::read_exact(header, &mut b).await?;
293            b[0]
294        };
295        if nid == K_ARCHIVE_PROPERTIES {
296            Self::read_archive_properties(header).await?;
297            nid = {
298                let mut b = [0u8; 1];
299                AsyncReadExt::read_exact(header, &mut b).await?;
300                b[0]
301            };
302        }
303
304        if nid == K_ADDITIONAL_STREAMS_INFO {
305            return Err(Error::other("Additional streams unsupported"));
306        }
307        if nid == K_MAIN_STREAMS_INFO {
308            Self::read_streams_info(header, archive).await?;
309            nid = {
310                let mut b = [0u8; 1];
311                AsyncReadExt::read_exact(header, &mut b).await?;
312                b[0]
313            };
314        }
315        if nid == K_FILES_INFO {
316            Self::read_files_info(header, archive).await?;
317            nid = {
318                let mut b = [0u8; 1];
319                AsyncReadExt::read_exact(header, &mut b).await?;
320                b[0]
321            };
322        }
323        if nid != K_END {
324            return Err(Error::BadTerminatedHeader(nid));
325        }
326
327        Ok(())
328    }
329
330    async fn read_archive_properties<R: AsyncRead + AsyncSeek + Unpin>(
331        header: &mut R,
332    ) -> Result<(), Error> {
333        let mut nid = {
334            let mut b = [0u8; 1];
335            AsyncReadExt::read_exact(header, &mut b).await?;
336            b[0]
337        };
338        while nid != K_END {
339            let property_size = read_variable_usize(header, "propertySize").await?;
340            AsyncSeekExt::seek(header, SeekFrom::Current(property_size as i64)).await?;
341            nid = {
342                let mut b = [0u8; 1];
343                AsyncReadExt::read_exact(header, &mut b).await?;
344                b[0]
345            };
346        }
347        Ok(())
348    }
349
350    async fn try_to_locale_end_header<R: AsyncRead + AsyncSeek + Unpin + Send>(
351        reader: &mut R,
352        reader_len: u64,
353        password: &Password,
354        thread_count: u32,
355    ) -> Result<Self, Error> {
356        let search_limit = 1024 * 1024;
357        let prev_data_size = reader.seek(SeekFrom::Current(0)).await? + 20;
358        let size = reader_len;
359        let cur_pos = reader.seek(SeekFrom::Current(0)).await?;
360        let min_pos = if cur_pos + search_limit > size {
361            cur_pos
362        } else {
363            size - search_limit
364        };
365        let mut pos = reader_len - 1;
366        while pos > min_pos {
367            pos -= 1;
368
369            AsyncSeekExt::seek(reader, SeekFrom::Start(pos)).await?;
370            let nid = {
371                let mut buf = [0u8; 1];
372                AsyncReadExt::read_exact(reader, &mut buf).await?;
373                buf[0]
374            };
375            if nid == K_ENCODED_HEADER || nid == K_HEADER {
376                let start_header = StartHeader {
377                    next_header_offset: pos - prev_data_size,
378                    next_header_size: reader_len - pos,
379                    next_header_crc: 0,
380                };
381                let result =
382                    Self::init_archive(reader, start_header, password, false, thread_count).await?;
383
384                if !result.files.is_empty() {
385                    return Ok(result);
386                }
387            }
388        }
389        Err(Error::other(
390            "Start header corrupt and unable to guess end header",
391        ))
392    }
393
394    async fn init_archive<R: AsyncRead + AsyncSeek + Unpin + Send>(
395        reader: &mut R,
396        start_header: StartHeader,
397        password: &Password,
398        verify_crc: bool,
399        thread_count: u32,
400    ) -> Result<Self, Error> {
401        if start_header.next_header_size > usize::MAX as u64 {
402            return Err(Error::other(format!(
403                "Cannot handle next_header_size {}",
404                start_header.next_header_size
405            )));
406        }
407
408        let next_header_size_int = start_header.next_header_size as usize;
409
410        AsyncSeekExt::seek(
411            reader,
412            SeekFrom::Start(SIGNATURE_HEADER_SIZE + start_header.next_header_offset),
413        )
414        .await?;
415
416        let mut buf = vec![0; next_header_size_int];
417        AsyncReadExt::read_exact(reader, &mut buf).await?;
418        if verify_crc && crc32fast::hash(&buf) as u64 != start_header.next_header_crc {
419            return Err(Error::NextHeaderCrcMismatch);
420        }
421
422        let mut archive = Archive::default();
423        let nid = buf.first().copied().unwrap_or(0);
424        if nid == K_ENCODED_HEADER {
425            let mut cursor = Cursor::new(&buf[1..]);
426            let (mut out_reader, buf_size) = Self::read_encoded_header(
427                &mut cursor,
428                reader,
429                &mut archive,
430                password,
431                thread_count,
432            )
433            .await?;
434            buf.clear();
435            buf.resize(buf_size, 0);
436            AsyncReadExt::read_exact(&mut out_reader, &mut buf)
437                .await
438                .map_err(|e| Error::bad_password(e, !password.is_empty()))?;
439            archive = Archive::default();
440        }
441        let nid = buf.first().copied().unwrap_or(0);
442        if nid == K_HEADER {
443            let mut header = Cursor::new(&buf[1..]);
444            Self::read_header(&mut header, &mut archive).await?;
445        } else {
446            return Err(Error::other("Broken or unsupported archive: no Header"));
447        }
448
449        archive.is_solid = archive
450            .blocks
451            .iter()
452            .any(|block| block.num_unpack_sub_streams > 1);
453
454        Ok(archive)
455    }
456
457    async fn read_encoded_header<'r, RI: 'r + AsyncRead + AsyncSeek + Unpin + Send>(
458        header: &mut (impl AsyncRead + Unpin),
459        reader: &'r mut RI,
460        archive: &mut Archive,
461        password: &Password,
462        thread_count: u32,
463    ) -> Result<(Box<dyn AsyncRead + Unpin + Send + 'r>, usize), Error> {
464        Self::read_streams_info(header, archive).await?;
465        let block = archive
466            .blocks
467            .first()
468            .ok_or(Error::other("no blocks, can't read encoded header"))?;
469        let first_pack_stream_index = 0;
470        let block_offset = SIGNATURE_HEADER_SIZE + archive.pack_pos;
471        if archive.pack_sizes.is_empty() {
472            return Err(Error::other("no packed streams, can't read encoded header"));
473        }
474
475        AsyncSeekExt::seek(reader, SeekFrom::Start(block_offset)).await?;
476        let coder_len = block.coders.len();
477        let unpack_size = block.get_unpack_size() as usize;
478        let pack_size = archive.pack_sizes[first_pack_stream_index] as usize;
479        let mut decoder: Box<dyn AsyncRead + Unpin + Send> =
480            Box::new(BoundedReader::new(reader, pack_size));
481        let mut decoder = if coder_len > 0 {
482            for (index, coder) in block.ordered_coder_iter() {
483                if coder.num_in_streams != 1 || coder.num_out_streams != 1 {
484                    return Err(Error::other(
485                        "Multi input/output stream coders are not yet supported",
486                    ));
487                }
488                let next = add_decoder(
489                    decoder,
490                    block.get_unpack_size_at_index(index) as usize,
491                    coder,
492                    password,
493                    MAX_MEM_LIMIT_KB,
494                    thread_count,
495                )
496                .await?;
497                decoder = Box::new(next);
498            }
499            decoder
500        } else {
501            decoder
502        };
503        if block.has_crc {
504            decoder = Box::new(Crc32VerifyingReader::new(decoder, unpack_size, block.crc));
505        }
506
507        Ok((decoder, unpack_size))
508    }
509
510    async fn read_streams_info<R: AsyncRead + Unpin>(
511        header: &mut R,
512        archive: &mut Archive,
513    ) -> Result<(), Error> {
514        let mut nid = {
515            let mut b = [0u8; 1];
516            AsyncReadExt::read_exact(header, &mut b).await?;
517            b[0]
518        };
519        if nid == K_PACK_INFO {
520            Self::read_pack_info(header, archive).await?;
521            nid = {
522                let mut b = [0u8; 1];
523                AsyncReadExt::read_exact(header, &mut b).await?;
524                b[0]
525            };
526        }
527
528        if nid == K_UNPACK_INFO {
529            Self::read_unpack_info(header, archive).await?;
530            nid = {
531                let mut b = [0u8; 1];
532                AsyncReadExt::read_exact(header, &mut b).await?;
533                b[0]
534            };
535        } else {
536            archive.blocks.clear();
537        }
538        if nid == K_SUB_STREAMS_INFO {
539            Self::read_sub_streams_info(header, archive).await?;
540            nid = {
541                let mut b = [0u8; 1];
542                AsyncReadExt::read_exact(header, &mut b).await?;
543                b[0]
544            };
545        }
546        if nid != K_END {
547            return Err(Error::BadTerminatedStreamsInfo(nid));
548        }
549
550        Ok(())
551    }
552
553    async fn read_files_info<R: AsyncRead + AsyncSeek + Unpin>(
554        header: &mut R,
555        archive: &mut Archive,
556    ) -> Result<(), Error> {
557        let num_files = read_variable_usize(header, "num files").await?;
558        let mut files: Vec<ArchiveEntry> = vec![Default::default(); num_files];
559
560        let mut is_empty_stream: Option<BitSet> = None;
561        let mut is_empty_file: Option<BitSet> = None;
562        let mut is_anti: Option<BitSet> = None;
563        loop {
564            let prop_type = {
565                let mut b = [0u8; 1];
566                AsyncReadExt::read_exact(header, &mut b).await?;
567                b[0]
568            };
569            if prop_type == 0 {
570                break;
571            }
572            let size = read_variable_u64(header).await?;
573            match prop_type {
574                K_EMPTY_STREAM => {
575                    is_empty_stream = Some(read_bits(header, num_files).await?);
576                }
577                K_EMPTY_FILE => {
578                    let n = if let Some(s) = &is_empty_stream {
579                        s.len()
580                    } else {
581                        return Err(Error::other(
582                            "Header format error: kEmptyStream must appear before kEmptyFile",
583                        ));
584                    };
585                    is_empty_file = Some(read_bits(header, n).await?);
586                }
587                K_ANTI => {
588                    let n = if let Some(s) = is_empty_stream.as_ref() {
589                        s.len()
590                    } else {
591                        return Err(Error::other(
592                            "Header format error: kEmptyStream must appear before kEmptyFile",
593                        ));
594                    };
595                    is_anti = Some(read_bits(header, n).await?);
596                }
597                K_NAME => {
598                    let external = {
599                        let mut b = [0u8; 1];
600                        AsyncReadExt::read_exact(header, &mut b).await?;
601                        b[0]
602                    };
603                    if external != 0 {
604                        return Err(Error::other("Not implemented:external != 0"));
605                    }
606                    if (size - 1) & 1 != 0 {
607                        return Err(Error::other("file names length invalid"));
608                    }
609
610                    let size = assert_usize(size, "file names length")?;
611                    let mut next_file = 0;
612                    let mut read_bytes = 0usize;
613                    let mut cache: Vec<u16> = Vec::with_capacity(16);
614                    let mut buf2 = [0u8; 2];
615                    while read_bytes < size - 1 {
616                        AsyncReadExt::read_exact(header, &mut buf2).await?;
617                        read_bytes += 2;
618                        let u = u16::from_le_bytes(buf2);
619                        if u == 0 {
620                            let s = String::from_utf16(&cache)
621                                .map_err(|e| Error::other(e.to_string()))?;
622                            files[next_file].name = s;
623                            next_file += 1;
624                            cache.clear();
625                        } else {
626                            cache.push(u);
627                        }
628                    }
629                    if next_file != files.len() {
630                        return Err(Error::other("Error parsing file names"));
631                    }
632                }
633                K_C_TIME => {
634                    let times_defined = read_all_or_bits(header, num_files).await?;
635                    let external = {
636                        let mut b = [0u8; 1];
637                        AsyncReadExt::read_exact(header, &mut b).await?;
638                        b[0]
639                    };
640                    if external != 0 {
641                        return Err(Error::other(format!(
642                            "kCTime Unimplemented:external={external}"
643                        )));
644                    }
645                    for (i, file) in files.iter_mut().enumerate() {
646                        file.has_creation_date = times_defined.contains(i);
647                        if file.has_creation_date {
648                            let mut b8 = [0u8; 8];
649                            AsyncReadExt::read_exact(header, &mut b8).await?;
650                            file.creation_date = u64::from_le_bytes(b8).into();
651                        }
652                    }
653                }
654                K_A_TIME => {
655                    let times_defined = read_all_or_bits(header, num_files).await?;
656                    let external = {
657                        let mut b = [0u8; 1];
658                        AsyncReadExt::read_exact(header, &mut b).await?;
659                        b[0]
660                    };
661                    if external != 0 {
662                        return Err(Error::other(format!(
663                            "kATime Unimplemented:external={external}"
664                        )));
665                    }
666                    for (i, file) in files.iter_mut().enumerate() {
667                        file.has_access_date = times_defined.contains(i);
668                        if file.has_access_date {
669                            let mut b8 = [0u8; 8];
670                            AsyncReadExt::read_exact(header, &mut b8).await?;
671                            file.access_date = u64::from_le_bytes(b8).into();
672                        }
673                    }
674                }
675                K_M_TIME => {
676                    let times_defined = read_all_or_bits(header, num_files).await?;
677                    let external = {
678                        let mut b = [0u8; 1];
679                        AsyncReadExt::read_exact(header, &mut b).await?;
680                        b[0]
681                    };
682                    if external != 0 {
683                        return Err(Error::other(format!(
684                            "kMTime Unimplemented:external={external}"
685                        )));
686                    }
687                    for (i, file) in files.iter_mut().enumerate() {
688                        file.has_last_modified_date = times_defined.contains(i);
689                        if file.has_last_modified_date {
690                            let mut b8 = [0u8; 8];
691                            AsyncReadExt::read_exact(header, &mut b8).await?;
692                            file.last_modified_date = u64::from_le_bytes(b8).into();
693                        }
694                    }
695                }
696                K_WIN_ATTRIBUTES => {
697                    let times_defined = read_all_or_bits(header, num_files).await?;
698                    let external = {
699                        let mut b = [0u8; 1];
700                        AsyncReadExt::read_exact(header, &mut b).await?;
701                        b[0]
702                    };
703                    if external != 0 {
704                        return Err(Error::other(format!(
705                            "kWinAttributes Unimplemented:external={external}"
706                        )));
707                    }
708                    for (i, file) in files.iter_mut().enumerate() {
709                        file.has_windows_attributes = times_defined.contains(i);
710                        if file.has_windows_attributes {
711                            let mut b4 = [0u8; 4];
712                            AsyncReadExt::read_exact(header, &mut b4).await?;
713                            file.windows_attributes = u32::from_le_bytes(b4);
714                        }
715                    }
716                }
717                K_START_POS => return Err(Error::other("kStartPos is unsupported, please report")),
718                K_DUMMY => {
719                    AsyncSeekExt::seek(header, SeekFrom::Current(size as i64)).await?;
720                }
721                _ => {
722                    AsyncSeekExt::seek(header, SeekFrom::Current(size as i64)).await?;
723                }
724            };
725        }
726
727        let mut non_empty_file_counter = 0;
728        let mut empty_file_counter = 0;
729        for (i, file) in files.iter_mut().enumerate() {
730            file.has_stream = is_empty_stream
731                .as_ref()
732                .map(|s| !s.contains(i))
733                .unwrap_or(true);
734            if file.has_stream {
735                let sub_stream_info = if let Some(s) = archive.sub_streams_info.as_ref() {
736                    s
737                } else {
738                    return Err(Error::other(
739                        "Archive contains file with streams but no subStreamsInfo",
740                    ));
741                };
742                file.is_directory = false;
743                file.is_anti_item = false;
744                file.has_crc = sub_stream_info.has_crc.contains(non_empty_file_counter);
745                file.crc = sub_stream_info.crcs[non_empty_file_counter];
746                file.size = sub_stream_info.unpack_sizes[non_empty_file_counter];
747                non_empty_file_counter += 1;
748            } else {
749                file.is_directory = if let Some(s) = &is_empty_file {
750                    !s.contains(empty_file_counter)
751                } else {
752                    true
753                };
754                file.is_anti_item = is_anti
755                    .as_ref()
756                    .map(|s| s.contains(empty_file_counter))
757                    .unwrap_or(false);
758                file.has_crc = false;
759                file.size = 0;
760                empty_file_counter += 1;
761            }
762        }
763        archive.files = files;
764
765        Self::calculate_stream_map(archive)?;
766        Ok(())
767    }
768
769    fn calculate_stream_map(archive: &mut Archive) -> Result<(), Error> {
770        let mut stream_map = StreamMap::default();
771
772        let mut next_block_pack_stream_index = 0;
773        let num_blocks = archive.blocks.len();
774        stream_map.block_first_pack_stream_index = vec![0; num_blocks];
775        for i in 0..num_blocks {
776            stream_map.block_first_pack_stream_index[i] = next_block_pack_stream_index;
777            next_block_pack_stream_index += archive.blocks[i].packed_streams.len();
778        }
779
780        let mut next_pack_stream_offset = 0;
781        let num_pack_sizes = archive.pack_sizes.len();
782        stream_map.pack_stream_offsets = vec![0; num_pack_sizes];
783        for i in 0..num_pack_sizes {
784            stream_map.pack_stream_offsets[i] = next_pack_stream_offset;
785            next_pack_stream_offset += archive.pack_sizes[i];
786        }
787
788        stream_map.block_first_file_index = vec![0; num_blocks];
789        stream_map.file_block_index = vec![None; archive.files.len()];
790        let mut next_block_index = 0;
791        let mut next_block_unpack_stream_index = 0;
792        for i in 0..archive.files.len() {
793            if !archive.files[i].has_stream && next_block_unpack_stream_index == 0 {
794                stream_map.file_block_index[i] = None;
795                continue;
796            }
797            if next_block_unpack_stream_index == 0 {
798                while next_block_index < archive.blocks.len() {
799                    stream_map.block_first_file_index[next_block_index] = i;
800                    if archive.blocks[next_block_index].num_unpack_sub_streams > 0 {
801                        break;
802                    }
803                    next_block_index += 1;
804                }
805                if next_block_index >= archive.blocks.len() {
806                    return Err(Error::other("Too few blocks in archive"));
807                }
808            }
809            stream_map.file_block_index[i] = Some(next_block_index);
810            if !archive.files[i].has_stream {
811                continue;
812            }
813
814            //set `compressed_size` of first file in block
815            if stream_map.block_first_file_index[next_block_index] == i {
816                let first_pack_stream_index =
817                    stream_map.block_first_pack_stream_index[next_block_index];
818                let pack_size = archive.pack_sizes[first_pack_stream_index];
819
820                archive.files[i].compressed_size = pack_size;
821            }
822
823            next_block_unpack_stream_index += 1;
824            if next_block_unpack_stream_index
825                >= archive.blocks[next_block_index].num_unpack_sub_streams
826            {
827                next_block_index += 1;
828                next_block_unpack_stream_index = 0;
829            }
830        }
831
832        archive.stream_map = stream_map;
833        Ok(())
834    }
835
836    async fn read_pack_info<R: AsyncRead + Unpin>(
837        header: &mut R,
838        archive: &mut Archive,
839    ) -> Result<(), Error> {
840        archive.pack_pos = read_variable_u64(header).await?;
841        let num_pack_streams = read_variable_usize(header, "num pack streams").await?;
842        let mut nid = {
843            let mut b = [0u8; 1];
844            AsyncReadExt::read_exact(header, &mut b).await?;
845            b[0]
846        };
847        if nid == K_SIZE {
848            archive.pack_sizes = vec![0u64; num_pack_streams];
849            for i in 0..archive.pack_sizes.len() {
850                archive.pack_sizes[i] = read_variable_u64(header).await?;
851            }
852            nid = {
853                let mut b = [0u8; 1];
854                AsyncReadExt::read_exact(header, &mut b).await?;
855                b[0]
856            };
857        }
858
859        if nid == K_CRC {
860            archive.pack_crcs_defined = read_all_or_bits(header, num_pack_streams).await?;
861            archive.pack_crcs = vec![0; num_pack_streams];
862            for i in 0..num_pack_streams {
863                if archive.pack_crcs_defined.contains(i) {
864                    let mut b4 = [0u8; 4];
865                    AsyncReadExt::read_exact(header, &mut b4).await?;
866                    archive.pack_crcs[i] = u32::from_le_bytes(b4) as u64;
867                }
868            }
869            nid = {
870                let mut b = [0u8; 1];
871                AsyncReadExt::read_exact(header, &mut b).await?;
872                b[0]
873            };
874        }
875
876        if nid != K_END {
877            return Err(Error::BadTerminatedPackInfo(nid));
878        }
879
880        Ok(())
881    }
882    async fn read_unpack_info<R: AsyncRead + Unpin>(
883        header: &mut R,
884        archive: &mut Archive,
885    ) -> Result<(), Error> {
886        let nid = {
887            let mut b = [0u8; 1];
888            AsyncReadExt::read_exact(header, &mut b).await?;
889            b[0]
890        };
891        if nid != K_FOLDER {
892            return Err(Error::other(format!("Expected kFolder, got {nid}")));
893        }
894        let num_blocks = read_variable_usize(header, "num blocks").await?;
895
896        archive.blocks.reserve_exact(num_blocks);
897        let external = {
898            let mut b = [0u8; 1];
899            AsyncReadExt::read_exact(header, &mut b).await?;
900            b[0]
901        };
902        if external != 0 {
903            return Err(Error::ExternalUnsupported);
904        }
905
906        for _ in 0..num_blocks {
907            archive.blocks.push(Self::read_block(header).await?);
908        }
909
910        let nid = {
911            let mut b = [0u8; 1];
912            AsyncReadExt::read_exact(header, &mut b).await?;
913            b[0]
914        };
915        if nid != K_CODERS_UNPACK_SIZE {
916            return Err(Error::other(format!(
917                "Expected kCodersUnpackSize, got {nid}"
918            )));
919        }
920
921        for block in archive.blocks.iter_mut() {
922            let tos = block.total_output_streams;
923            block.unpack_sizes.reserve_exact(tos);
924            for _ in 0..tos {
925                block.unpack_sizes.push(read_variable_u64(header).await?);
926            }
927        }
928
929        let mut nid = {
930            let mut b = [0u8; 1];
931            AsyncReadExt::read_exact(header, &mut b).await?;
932            b[0]
933        };
934        if nid == K_CRC {
935            let crcs_defined = read_all_or_bits(header, num_blocks).await?;
936            for i in 0..num_blocks {
937                if crcs_defined.contains(i) {
938                    archive.blocks[i].has_crc = true;
939                    let mut b4 = [0u8; 4];
940                    AsyncReadExt::read_exact(header, &mut b4).await?;
941                    archive.blocks[i].crc = u32::from_le_bytes(b4) as u64;
942                } else {
943                    archive.blocks[i].has_crc = false;
944                }
945            }
946            nid = {
947                let mut b = [0u8; 1];
948                AsyncReadExt::read_exact(header, &mut b).await?;
949                b[0]
950            };
951        }
952        if nid != K_END {
953            return Err(Error::BadTerminatedUnpackInfo);
954        }
955
956        Ok(())
957    }
958
959    async fn read_sub_streams_info<R: AsyncRead + Unpin>(
960        header: &mut R,
961        archive: &mut Archive,
962    ) -> Result<(), Error> {
963        for block in archive.blocks.iter_mut() {
964            block.num_unpack_sub_streams = 1;
965        }
966        let mut total_unpack_streams = archive.blocks.len();
967
968        let mut nid = {
969            let mut b = [0u8; 1];
970            AsyncReadExt::read_exact(header, &mut b).await?;
971            b[0]
972        };
973        if nid == K_NUM_UNPACK_STREAM {
974            total_unpack_streams = 0;
975            for block in archive.blocks.iter_mut() {
976                let num_streams = read_variable_usize(header, "numStreams").await?;
977                block.num_unpack_sub_streams = num_streams;
978                total_unpack_streams += num_streams;
979            }
980            nid = {
981                let mut b = [0u8; 1];
982                AsyncReadExt::read_exact(header, &mut b).await?;
983                b[0]
984            };
985        }
986
987        let mut sub_streams_info = SubStreamsInfo::default();
988        sub_streams_info
989            .unpack_sizes
990            .resize(total_unpack_streams, Default::default());
991        sub_streams_info
992            .has_crc
993            .reserve_len_exact(total_unpack_streams);
994        sub_streams_info.crcs = vec![0; total_unpack_streams];
995
996        let mut next_unpack_stream = 0;
997        for block in archive.blocks.iter() {
998            if block.num_unpack_sub_streams == 0 {
999                continue;
1000            }
1001            let mut sum = 0;
1002            if nid == K_SIZE {
1003                for _i in 0..block.num_unpack_sub_streams - 1 {
1004                    let size = read_variable_u64(header).await?;
1005                    sub_streams_info.unpack_sizes[next_unpack_stream] = size;
1006                    next_unpack_stream += 1;
1007                    sum += size;
1008                }
1009            }
1010            if sum > block.get_unpack_size() {
1011                return Err(Error::other(
1012                    "sum of unpack sizes of block exceeds total unpack size",
1013                ));
1014            }
1015            // Calculate the last size from the total minus the sum of N-1 sizes.
1016            sub_streams_info.unpack_sizes[next_unpack_stream] = block.get_unpack_size() - sum;
1017            next_unpack_stream += 1;
1018        }
1019        if nid == K_SIZE {
1020            nid = {
1021                let mut b = [0u8; 1];
1022                AsyncReadExt::read_exact(header, &mut b).await?;
1023                b[0]
1024            };
1025        }
1026
1027        let mut num_digests = 0;
1028        for block in archive.blocks.iter() {
1029            if block.num_unpack_sub_streams != 1 || !block.has_crc {
1030                num_digests += block.num_unpack_sub_streams;
1031            }
1032        }
1033
1034        if nid == K_CRC {
1035            let has_missing_crc = read_all_or_bits(header, num_digests).await?;
1036            let mut missing_crcs = vec![0; num_digests];
1037            for (i, missing_crc) in missing_crcs.iter_mut().enumerate() {
1038                if has_missing_crc.contains(i) {
1039                    let mut b4 = [0u8; 4];
1040                    AsyncReadExt::read_exact(header, &mut b4).await?;
1041                    *missing_crc = u32::from_le_bytes(b4) as u64;
1042                }
1043            }
1044            let mut next_crc = 0;
1045            let mut next_missing_crc = 0;
1046            for block in archive.blocks.iter() {
1047                if block.num_unpack_sub_streams == 1 && block.has_crc {
1048                    sub_streams_info.has_crc.insert(next_crc);
1049                    sub_streams_info.crcs[next_crc] = block.crc;
1050                    next_crc += 1;
1051                } else {
1052                    for _i in 0..block.num_unpack_sub_streams {
1053                        if has_missing_crc.contains(next_missing_crc) {
1054                            sub_streams_info.has_crc.insert(next_crc);
1055                        } else {
1056                            sub_streams_info.has_crc.remove(next_crc);
1057                        }
1058                        sub_streams_info.crcs[next_crc] = missing_crcs[next_missing_crc];
1059                        next_crc += 1;
1060                        next_missing_crc += 1;
1061                    }
1062                }
1063            }
1064
1065            nid = {
1066                let mut b = [0u8; 1];
1067                AsyncReadExt::read_exact(header, &mut b).await?;
1068                b[0]
1069            };
1070        }
1071
1072        if nid != K_END {
1073            return Err(Error::BadTerminatedSubStreamsInfo);
1074        }
1075
1076        archive.sub_streams_info = Some(sub_streams_info);
1077        Ok(())
1078    }
1079
1080    async fn read_block<R: AsyncRead + Unpin>(header: &mut R) -> Result<Block, Error> {
1081        let mut block = Block::default();
1082
1083        let num_coders = read_variable_usize(header, "num coders").await?;
1084        let mut coders = Vec::with_capacity(num_coders);
1085        let mut total_in_streams = 0;
1086        let mut total_out_streams = 0;
1087        for _i in 0..num_coders {
1088            let mut coder = Coder::default();
1089            let bits = {
1090                let mut b = [0u8; 1];
1091                AsyncReadExt::read_exact(header, &mut b).await?;
1092                b[0]
1093            };
1094            let id_size = bits & 0xF;
1095            let is_simple = (bits & 0x10) == 0;
1096            let has_attributes = (bits & 0x20) != 0;
1097            let more_alternative_methods = (bits & 0x80) != 0;
1098
1099            coder.id_size = id_size as usize;
1100
1101            AsyncReadExt::read_exact(header, coder.decompression_method_id_mut()).await?;
1102            if is_simple {
1103                coder.num_in_streams = 1;
1104                coder.num_out_streams = 1;
1105            } else {
1106                coder.num_in_streams = read_variable_u64(header).await?;
1107                coder.num_out_streams = read_variable_u64(header).await?;
1108            }
1109            total_in_streams += coder.num_in_streams;
1110            total_out_streams += coder.num_out_streams;
1111            if has_attributes {
1112                let properties_size = read_variable_usize(header, "properties size").await?;
1113                let mut props = vec![0u8; properties_size];
1114                AsyncReadExt::read_exact(header, &mut props).await?;
1115                coder.properties = props;
1116            }
1117            coders.push(coder);
1118            // would need to keep looping as above:
1119            if more_alternative_methods {
1120                return Err(Error::other(
1121                    "Alternative methods are unsupported, please report. The reference implementation doesn't support them either.",
1122                ));
1123            }
1124        }
1125        block.coders = coders;
1126        let total_in_streams = assert_usize(total_in_streams, "totalInStreams")?;
1127        let total_out_streams = assert_usize(total_out_streams, "totalOutStreams")?;
1128        block.total_input_streams = total_in_streams;
1129        block.total_output_streams = total_out_streams;
1130
1131        if total_out_streams == 0 {
1132            return Err(Error::other("Total output streams can't be 0"));
1133        }
1134        let num_bind_pairs = total_out_streams - 1;
1135        let mut bind_pairs = Vec::with_capacity(num_bind_pairs);
1136        for _ in 0..num_bind_pairs {
1137            let bp = BindPair {
1138                in_index: read_variable_u64(header).await?,
1139                out_index: read_variable_u64(header).await?,
1140            };
1141            bind_pairs.push(bp);
1142        }
1143        block.bind_pairs = bind_pairs;
1144
1145        if total_in_streams < num_bind_pairs {
1146            return Err(Error::other(
1147                "Total input streams can't be less than the number of bind pairs",
1148            ));
1149        }
1150        let num_packed_streams = total_in_streams - num_bind_pairs;
1151        let mut packed_streams = vec![0; num_packed_streams];
1152        if num_packed_streams == 1 {
1153            let mut index = u64::MAX;
1154            for i in 0..total_in_streams {
1155                if block.find_bind_pair_for_in_stream(i as u64).is_none() {
1156                    index = i as u64;
1157                    break;
1158                }
1159            }
1160            if index == u64::MAX {
1161                return Err(Error::other("Couldn't find stream's bind pair index"));
1162            }
1163            packed_streams[0] = index;
1164        } else {
1165            for packed_stream in packed_streams.iter_mut() {
1166                *packed_stream = read_variable_u64(header).await?;
1167            }
1168        }
1169        block.packed_streams = packed_streams;
1170
1171        Ok(block)
1172    }
1173}
1174
1175#[inline]
1176async fn read_variable_usize<R: AsyncRead + Unpin>(
1177    reader: &mut R,
1178    field: &str,
1179) -> Result<usize, Error> {
1180    let size = read_variable_u64(reader).await?;
1181    assert_usize(size, field)
1182}
1183
1184#[inline]
1185fn assert_usize(size: u64, field: &str) -> Result<usize, Error> {
1186    if size > usize::MAX as u64 {
1187        return Err(Error::other(format!("Cannot handle {field} {size}")));
1188    }
1189    Ok(size as usize)
1190}
1191
1192async fn read_variable_u64<R: AsyncRead + Unpin>(reader: &mut R) -> io::Result<u64> {
1193    let first = {
1194        let mut b = [0u8; 1];
1195        AsyncReadExt::read_exact(reader, &mut b).await?;
1196        b[0] as u64
1197    };
1198    let mut mask = 0x80_u64;
1199    let mut value = 0;
1200    for i in 0..8 {
1201        if (first & mask) == 0 {
1202            return Ok(value | ((first & (mask - 1)) << (8 * i)));
1203        }
1204        let b = {
1205            let mut bb = [0u8; 1];
1206            AsyncReadExt::read_exact(reader, &mut bb).await?;
1207            bb[0] as u64
1208        };
1209        value |= b << (8 * i);
1210        mask >>= 1;
1211    }
1212    Ok(value)
1213}
1214
1215async fn read_all_or_bits<R: AsyncRead + Unpin>(header: &mut R, size: usize) -> io::Result<BitSet> {
1216    let all = {
1217        let mut b = [0u8; 1];
1218        AsyncReadExt::read_exact(header, &mut b).await?;
1219        b[0]
1220    };
1221    if all != 0 {
1222        let mut bits = BitSet::with_capacity(size);
1223        for i in 0..size {
1224            bits.insert(i);
1225        }
1226        Ok(bits)
1227    } else {
1228        read_bits(header, size).await
1229    }
1230}
1231
1232async fn read_bits<R: AsyncRead + Unpin>(header: &mut R, size: usize) -> io::Result<BitSet> {
1233    let mut bits = BitSet::with_capacity(size);
1234    let mut mask = 0u32;
1235    let mut cache = 0u32;
1236    for i in 0..size {
1237        if mask == 0 {
1238            mask = 0x80;
1239            let mut b = [0u8; 1];
1240            AsyncReadExt::read_exact(header, &mut b).await?;
1241            cache = b[0] as u32;
1242        }
1243        if (cache & mask) != 0 {
1244            bits.insert(i);
1245        }
1246        mask >>= 1;
1247    }
1248    Ok(bits)
1249}
1250
1251#[derive(Copy, Clone)]
1252struct IndexEntry {
1253    block_index: Option<usize>,
1254    file_index: usize,
1255}
1256
1257/// Reads a 7z archive file.
1258pub struct ArchiveReader<R: AsyncRead + AsyncSeek + Unpin> {
1259    source: R,
1260    archive: Archive,
1261    password: Password,
1262    thread_count: u32,
1263    index: HashMap<String, IndexEntry>,
1264}
1265
1266#[cfg(not(target_arch = "wasm32"))]
1267impl ArchiveReader<Cursor<Vec<u8>>> {
1268    /// Opens a 7z archive file asynchronously and creates an `ArchiveReader` using an in-memory buffer.
1269    pub async fn open(path: impl AsRef<Path>, password: Password) -> Result<Self, Error> {
1270        let data = afs::read(path.as_ref())
1271            .await
1272            .map_err(|e| Error::file_open(e, path.as_ref().to_string_lossy().to_string()))?;
1273        let cursor = Cursor::new(data);
1274        Self::new(cursor, password).await
1275    }
1276
1277    /// Opens a 7z archive from in-memory bytes asynchronously.
1278    pub async fn open_from_bytes(data: Vec<u8>, password: Password) -> Result<Self, Error> {
1279        let cursor = Cursor::new(data);
1280        Self::new(cursor, password).await
1281    }
1282}
1283
1284impl<R: AsyncRead + AsyncSeek + Unpin + Send> ArchiveReader<R> {
1285    /// Creates a [`ArchiveReader`] to read a 7z archive file from the given `source` reader.
1286    #[inline]
1287    pub(crate) async fn new(mut source: R, password: Password) -> Result<Self, Error> {
1288        let archive = Archive::read(&mut source, &password).await?;
1289
1290        let mut reader = Self {
1291            source,
1292            archive,
1293            password,
1294            thread_count: 1,
1295            index: HashMap::default(),
1296        };
1297
1298        reader.fill_index();
1299
1300        let thread_count =
1301            std::thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
1302        reader.set_thread_count(thread_count.get() as u32);
1303
1304        Ok(reader)
1305    }
1306
1307    /// Creates an [`ArchiveReader`] from an existing [`Archive`] instance.
1308    ///
1309    /// This is useful when you already have a parsed archive and want to create a reader
1310    /// without re-parsing the archive structure.
1311    ///
1312    /// # Arguments
1313    /// * `archive` - An existing parsed archive instance
1314    /// * `source` - The reader providing access to the archive data
1315    /// * `password` - Password for encrypted archives
1316    #[inline]
1317    pub fn from_archive(archive: Archive, source: R, password: Password) -> Self {
1318        let mut reader = Self {
1319            source,
1320            archive,
1321            password,
1322            thread_count: 1,
1323            index: HashMap::default(),
1324        };
1325
1326        reader.fill_index();
1327
1328        let thread_count =
1329            std::thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
1330        reader.set_thread_count(thread_count.get() as u32);
1331
1332        reader
1333    }
1334
1335    /// Sets the thread count to use when multi-threading is supported by the de-compression
1336    /// (currently only LZMA2 if encoded with MT support).
1337    ///
1338    /// Defaults to `std::thread::available_parallelism()` if not set manually.
1339    pub fn set_thread_count(&mut self, thread_count: u32) {
1340        self.thread_count = thread_count.clamp(1, 256);
1341    }
1342
1343    fn fill_index(&mut self) {
1344        for (file_index, file) in self.archive.files.iter().enumerate() {
1345            let block_index = self.archive.stream_map.file_block_index[file_index];
1346
1347            self.index.insert(
1348                file.name.clone(),
1349                IndexEntry {
1350                    block_index,
1351                    file_index,
1352                },
1353            );
1354        }
1355    }
1356
1357    /// Returns a reference to the underlying [`Archive`] structure.
1358    ///
1359    /// This provides access to the archive metadata including files, blocks,
1360    /// and compression information.
1361    #[inline]
1362    pub fn archive(&self) -> &Archive {
1363        &self.archive
1364    }
1365
1366    async fn build_decode_stack<'r>(
1367        source: &'r mut R,
1368        archive: &Archive,
1369        block_index: usize,
1370        password: &Password,
1371        thread_count: u32,
1372    ) -> Result<(Box<dyn AsyncRead + Unpin + Send + 'r>, usize), Error> {
1373        let block = &archive.blocks[block_index];
1374        if block.total_input_streams > block.total_output_streams {
1375            return Self::build_decode_stack2(source, archive, block_index, password, thread_count);
1376        }
1377        let first_pack_stream_index = archive.stream_map.block_first_pack_stream_index[block_index];
1378        let block_offset = SIGNATURE_HEADER_SIZE
1379            + archive.pack_pos
1380            + archive.stream_map.pack_stream_offsets[first_pack_stream_index];
1381
1382        let (mut has_crc, mut crc) = (block.has_crc, block.crc);
1383
1384        // Single stream blocks might have it's CRC stored in the single substream information.
1385        if !has_crc && block.num_unpack_sub_streams == 1 {
1386            if let Some(sub_streams_info) = archive.sub_streams_info.as_ref() {
1387                let mut substream_index = 0;
1388                for i in 0..block_index {
1389                    substream_index += archive.blocks[i].num_unpack_sub_streams;
1390                }
1391
1392                // Only when there is a single stream, we can use it's CRC to verify the compressed block data.
1393                // Multiple streams would contain the CRC of the compressed data for each file in the block.
1394                if sub_streams_info.has_crc.contains(substream_index) {
1395                    has_crc = true;
1396                    crc = sub_streams_info.crcs[substream_index];
1397                }
1398            }
1399        }
1400
1401        AsyncSeekExt::seek(source, SeekFrom::Start(block_offset)).await?;
1402        let pack_size = archive.pack_sizes[first_pack_stream_index] as usize;
1403
1404        let mut decoder: Box<dyn AsyncRead + Unpin + Send> =
1405            Box::new(BoundedReader::new(source, pack_size));
1406        let block = &archive.blocks[block_index];
1407        for (index, coder) in block.ordered_coder_iter() {
1408            if coder.num_in_streams != 1 || coder.num_out_streams != 1 {
1409                return Err(Error::unsupported(
1410                    "Multi input/output stream coders are not supported",
1411                ));
1412            }
1413            let next = add_decoder(
1414                decoder,
1415                block.get_unpack_size_at_index(index) as usize,
1416                coder,
1417                password,
1418                MAX_MEM_LIMIT_KB,
1419                thread_count,
1420            )
1421            .await?;
1422            decoder = Box::new(next);
1423        }
1424        if has_crc {
1425            decoder = Box::new(Crc32VerifyingReader::new(
1426                decoder,
1427                block.get_unpack_size() as usize,
1428                crc,
1429            ));
1430        }
1431
1432        Ok((decoder, pack_size))
1433    }
1434
1435    fn build_decode_stack2<'r>(
1436        source: &'r mut R,
1437        archive: &Archive,
1438        block_index: usize,
1439        password: &Password,
1440        thread_count: u32,
1441    ) -> Result<(Box<dyn AsyncRead + Unpin + Send + 'r>, usize), Error> {
1442        const MAX_CODER_COUNT: usize = 32;
1443        let block = &archive.blocks[block_index];
1444        if block.coders.len() > MAX_CODER_COUNT {
1445            return Err(Error::unsupported(format!(
1446                "Too many coders: {}",
1447                block.coders.len()
1448            )));
1449        }
1450
1451        assert!(block.total_input_streams > block.total_output_streams);
1452        let shared_source = Arc::new(Mutex::new(source));
1453        let first_pack_stream_index = archive.stream_map.block_first_pack_stream_index[block_index];
1454        let start_pos = SIGNATURE_HEADER_SIZE + archive.pack_pos;
1455        let offsets = &archive.stream_map.pack_stream_offsets[first_pack_stream_index..];
1456
1457        let mut sources = Vec::with_capacity(block.packed_streams.len());
1458
1459        for (i, offset) in offsets[..block.packed_streams.len()].iter().enumerate() {
1460            let pack_pos = start_pos + offset;
1461            let pack_size = archive.pack_sizes[first_pack_stream_index + i];
1462
1463            let pack_reader = SharedBoundedReader::new(
1464                Arc::clone(&shared_source),
1465                (pack_pos, pack_pos + pack_size),
1466            );
1467
1468            sources.push(pack_reader);
1469        }
1470
1471        let mut coder_to_stream_map = [usize::MAX; MAX_CODER_COUNT];
1472
1473        let mut si = 0;
1474        for (i, coder) in block.coders.iter().enumerate() {
1475            coder_to_stream_map[i] = si;
1476            si += coder.num_in_streams as usize;
1477        }
1478
1479        let main_coder_index = {
1480            let mut coder_used = [false; MAX_CODER_COUNT];
1481            for bp in block.bind_pairs.iter() {
1482                coder_used[bp.out_index as usize] = true;
1483            }
1484            let mut mci = 0;
1485            for (i, used) in coder_used[..block.coders.len()].iter().enumerate() {
1486                if !used {
1487                    mci = i;
1488                    break;
1489                }
1490            }
1491            mci
1492        };
1493
1494        let id = block.coders[main_coder_index].encoder_method_id();
1495        if id != EncoderMethod::ID_BCJ2 {
1496            return Err(Error::unsupported(format!("Unsupported method: {id:?}")));
1497        }
1498
1499        let num_in_streams = block.coders[main_coder_index].num_in_streams as usize;
1500        let mut inputs: Vec<Box<dyn AsyncRead + Unpin + Send>> = Vec::with_capacity(num_in_streams);
1501        let start_i = coder_to_stream_map[main_coder_index];
1502        for i in start_i..num_in_streams + start_i {
1503            inputs.push(Self::get_in_stream(
1504                block,
1505                &sources,
1506                &coder_to_stream_map,
1507                password,
1508                i,
1509                thread_count,
1510            )?);
1511        }
1512        let inputs_std = inputs
1513            .into_iter()
1514            .map(crate::util::decompress::AsyncReadSeekAsStd::new)
1515            .collect::<Vec<_>>();
1516        let mut decoder: Box<dyn AsyncRead + Unpin + Send> = Box::new(AsyncStdRead::new(
1517            Bcj2Reader::new(inputs_std, block.get_unpack_size()),
1518        ));
1519        if block.has_crc {
1520            decoder = Box::new(Crc32VerifyingReader::new(
1521                decoder,
1522                block.get_unpack_size() as usize,
1523                block.crc,
1524            ));
1525        }
1526        Ok((
1527            decoder,
1528            archive.pack_sizes[first_pack_stream_index] as usize,
1529        ))
1530    }
1531
1532    fn get_in_stream<'r>(
1533        block: &Block,
1534        sources: &[SharedBoundedReader<'r, R>],
1535        coder_to_stream_map: &[usize],
1536        password: &Password,
1537        in_stream_index: usize,
1538        thread_count: u32,
1539    ) -> Result<Box<dyn AsyncRead + Unpin + Send + 'r>, Error>
1540    where
1541        R: 'r,
1542    {
1543        let index = block
1544            .packed_streams
1545            .iter()
1546            .position(|&i| i == in_stream_index as u64);
1547        if let Some(index) = index {
1548            return Ok(Box::new(sources[index].clone()));
1549        }
1550
1551        let bp = block
1552            .find_bind_pair_for_in_stream(in_stream_index as u64)
1553            .ok_or_else(|| {
1554                Error::other(format!(
1555                    "Couldn't find bind pair for stream {in_stream_index}"
1556                ))
1557            })?;
1558        let index = bp.out_index as usize;
1559
1560        Self::get_in_stream2(
1561            block,
1562            sources,
1563            coder_to_stream_map,
1564            password,
1565            index,
1566            thread_count,
1567        )
1568    }
1569
1570    fn get_in_stream2<'r>(
1571        block: &Block,
1572        sources: &[SharedBoundedReader<'r, R>],
1573        coder_to_stream_map: &[usize],
1574        password: &Password,
1575        in_stream_index: usize,
1576        thread_count: u32,
1577    ) -> Result<Box<dyn AsyncRead + Unpin + Send + 'r>, Error>
1578    where
1579        R: 'r,
1580    {
1581        let coder = &block.coders[in_stream_index];
1582        let start_index = coder_to_stream_map[in_stream_index];
1583        if start_index == usize::MAX {
1584            return Err(Error::other("in_stream_index out of range"));
1585        }
1586        let uncompressed_len = block.unpack_sizes[in_stream_index] as usize;
1587        if coder.num_in_streams == 1 {
1588            let input = Self::get_in_stream(
1589                block,
1590                sources,
1591                coder_to_stream_map,
1592                password,
1593                start_index,
1594                thread_count,
1595            )?;
1596
1597            let decoder = async_io::block_on(add_decoder(
1598                input,
1599                uncompressed_len,
1600                coder,
1601                password,
1602                MAX_MEM_LIMIT_KB,
1603                thread_count,
1604            ))?;
1605            return Ok(Box::new(decoder));
1606        }
1607        Err(Error::unsupported(
1608            "Multi input stream coders are not yet supported",
1609        ))
1610    }
1611
1612    pub(crate) async fn for_each_entries<
1613        F: for<'a> FnMut(
1614            &'a ArchiveEntry,
1615            &'a mut (dyn AsyncRead + Unpin + Send + 'a),
1616        ) -> Pin<Box<dyn Future<Output = Result<bool, Error>> + Send + 'a>>,
1617    >(
1618        &mut self,
1619        mut each: F,
1620    ) -> Result<(), Error> {
1621        let block_count = self.archive.blocks.len();
1622        for block_index in 0..block_count {
1623            let forder_dec = BlockDecoder::new(
1624                self.thread_count,
1625                block_index,
1626                &self.archive,
1627                &self.password,
1628                &mut self.source,
1629            );
1630            if !forder_dec
1631                .for_each_entries(&mut each)
1632                .await
1633                .map_err(|e| e.maybe_bad_password(!self.password.is_empty()))?
1634            {
1635                return Ok(());
1636            }
1637        }
1638        // decode empty files
1639        for file_index in 0..self.archive.files.len() {
1640            let block_index = self.archive.stream_map.file_block_index[file_index];
1641            if block_index.is_none() {
1642                let file = &self.archive.files[file_index];
1643                let mut empty_reader = AsyncStdRead::new([0u8; 0].as_slice());
1644                if !each(file, &mut empty_reader).await? {
1645                    return Ok(());
1646                }
1647            }
1648        }
1649        Ok(())
1650    }
1651
1652    /// Returns the data of a file with the given path inside the archive.
1653    ///
1654    /// # Notice
1655    /// This function is very inefficient when used with solid archives, since
1656    /// it needs to decode all data before the actual file.
1657    pub async fn read_file(&mut self, name: &str) -> Result<Vec<u8>, Error> {
1658        let index_entry = *self.index.get(name).ok_or(Error::FileNotFound)?;
1659        let file = &self.archive.files[index_entry.file_index];
1660
1661        if !file.has_stream {
1662            return Ok(Vec::new());
1663        }
1664
1665        let block_index = index_entry
1666            .block_index
1667            .ok_or_else(|| Error::other("File has no associated block"))?;
1668
1669        match self.archive.is_solid {
1670            true => {
1671                use std::sync::{Arc, Mutex};
1672                let result_cell = Arc::new(Mutex::new(None));
1673                let target_name = name.to_string();
1674
1675                BlockDecoder::new(
1676                    self.thread_count,
1677                    block_index,
1678                    &self.archive,
1679                    &self.password,
1680                    &mut self.source,
1681                )
1682                .for_each_entries(&mut |archive_entry, reader| {
1683                    let result_cell = Arc::clone(&result_cell);
1684                    let target_name = target_name.clone();
1685                    Box::pin(async move {
1686                        let mut data = Vec::with_capacity(archive_entry.size as usize);
1687                        AsyncReadExt::read_to_end(reader, &mut data).await?;
1688                        if archive_entry.name == target_name {
1689                            *result_cell.lock().unwrap() = Some(data);
1690                            Ok(false)
1691                        } else {
1692                            Ok(true)
1693                        }
1694                    })
1695                })
1696                .await?;
1697
1698                let mut guard = result_cell.lock().unwrap();
1699                guard.take().ok_or(Error::FileNotFound)
1700            }
1701            false => {
1702                let pack_index = self.archive.stream_map.block_first_pack_stream_index[block_index];
1703                let pack_offset = self.archive.stream_map.pack_stream_offsets[pack_index];
1704                let block_offset = SIGNATURE_HEADER_SIZE + self.archive.pack_pos + pack_offset;
1705
1706                AsyncSeekExt::seek(&mut self.source, SeekFrom::Start(block_offset)).await?;
1707
1708                let (mut block_reader, _size) = Self::build_decode_stack(
1709                    &mut self.source,
1710                    &self.archive,
1711                    block_index,
1712                    &self.password,
1713                    self.thread_count,
1714                )
1715                .await?;
1716
1717                let mut data = Vec::with_capacity(file.size as usize);
1718                let mut decoder: Box<dyn AsyncRead + Unpin + Send> =
1719                    Box::new(BoundedReader::new(&mut block_reader, file.size as usize));
1720
1721                if file.has_crc {
1722                    decoder = Box::new(Crc32VerifyingReader::new(
1723                        decoder,
1724                        file.size as usize,
1725                        file.crc,
1726                    ));
1727                }
1728
1729                AsyncReadExt::read_to_end(&mut decoder, &mut data).await?;
1730
1731                Ok(data)
1732            }
1733        }
1734    }
1735
1736    /// Get the compression method(s) used for a specific file in the archive.
1737    pub fn file_compression_methods(
1738        &self,
1739        file_name: &str,
1740        methods: &mut Vec<EncoderMethod>,
1741    ) -> Result<(), Error> {
1742        let index_entry = self.index.get(file_name).ok_or(Error::FileNotFound)?;
1743        let file = &self.archive.files[index_entry.file_index];
1744
1745        if !file.has_stream {
1746            return Ok(());
1747        }
1748
1749        let block_index = index_entry
1750            .block_index
1751            .ok_or_else(|| Error::other("File has no associated block"))?;
1752
1753        let block = self
1754            .archive
1755            .blocks
1756            .get(block_index)
1757            .ok_or_else(|| Error::other("Block not found"))?;
1758
1759        block
1760            .coders
1761            .iter()
1762            .filter_map(|coder| EncoderMethod::by_id(coder.encoder_method_id()))
1763            .for_each(|method| {
1764                methods.push(method);
1765            });
1766
1767        Ok(())
1768    }
1769}
1770
1771/// Decoder for a specific block within a 7z archive.
1772///
1773/// Provides access to entries within a single compression block and allows
1774/// decoding files from that block.
1775pub struct BlockDecoder<'a, R: AsyncRead + AsyncSeek + Unpin> {
1776    thread_count: u32,
1777    block_index: usize,
1778    archive: &'a Archive,
1779    password: &'a Password,
1780    source: &'a mut R,
1781}
1782
1783impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send> BlockDecoder<'a, R> {
1784    /// Creates a new [`BlockDecoder`] for decoding a specific block in the archive.
1785    ///
1786    /// # Arguments
1787    /// * `thread_count` - Number of threads to use for multi-threaded decompression (if supported
1788    ///   by the codec)
1789    /// * `block_index` - Index of the block to decode within the archive
1790    /// * `archive` - Reference to the archive containing the block
1791    /// * `password` - Password for encrypted blocks
1792    /// * `source` - Mutable reference to the reader providing archive data
1793    pub fn new(
1794        thread_count: u32,
1795        block_index: usize,
1796        archive: &'a Archive,
1797        password: &'a Password,
1798        source: &'a mut R,
1799    ) -> Self {
1800        Self {
1801            thread_count,
1802            block_index,
1803            archive,
1804            password,
1805            source,
1806        }
1807    }
1808
1809    /// Sets the thread count to use when multi-threading is supported by the de-compression
1810    /// (currently only LZMA2 if encoded with MT support).
1811    pub fn set_thread_count(&mut self, thread_count: u32) {
1812        self.thread_count = thread_count.clamp(1, 256);
1813    }
1814
1815    /// Returns a slice of archive entries contained in this block.
1816    ///
1817    /// The entries are returned in the order they appear in the block.
1818    pub fn entries(&self) -> &[ArchiveEntry] {
1819        let start = self.archive.stream_map.block_first_file_index[self.block_index];
1820        let file_count = self.archive.blocks[self.block_index].num_unpack_sub_streams;
1821        &self.archive.files[start..(file_count + start)]
1822    }
1823
1824    /// Returns the number of entries contained in this block.
1825    pub fn entry_count(&self) -> usize {
1826        self.archive.blocks[self.block_index].num_unpack_sub_streams
1827    }
1828
1829    /// Takes a closure to decode each files in this block.
1830    ///
1831    /// When decoding files in a block, the data to be decompressed depends on the data in front of
1832    /// it, you cannot simply skip the previous data and only decompress the data in the back.
1833    ///
1834    /// Non-solid archives use one block per file and allow more effective decoding of single files.
1835    pub async fn for_each_entries<
1836        F: for<'b> FnMut(
1837            &'b ArchiveEntry,
1838            &'b mut (dyn AsyncRead + Unpin + Send + 'b),
1839        ) -> Pin<Box<dyn Future<Output = Result<bool, Error>> + Send + 'b>>,
1840    >(
1841        self,
1842        each: &mut F,
1843    ) -> Result<bool, Error> {
1844        let Self {
1845            thread_count,
1846            block_index,
1847            archive,
1848            password,
1849            source,
1850        } = self;
1851        let (mut block_reader, _size) =
1852            ArchiveReader::build_decode_stack(source, archive, block_index, password, thread_count)
1853                .await?;
1854        let start = archive.stream_map.block_first_file_index[block_index];
1855        let file_count = archive.blocks[block_index].num_unpack_sub_streams;
1856
1857        for file_index in start..(file_count + start) {
1858            let file = &archive.files[file_index];
1859            if file.has_stream && file.size > 0 {
1860                let mut decoder: Box<dyn AsyncRead + Unpin + Send> =
1861                    Box::new(BoundedReader::new(&mut block_reader, file.size as usize));
1862                if file.has_crc {
1863                    decoder = Box::new(Crc32VerifyingReader::new(
1864                        decoder,
1865                        file.size as usize,
1866                        file.crc,
1867                    ));
1868                }
1869                {
1870                    let cont = each(file, &mut decoder)
1871                        .await
1872                        .map_err(|e| e.maybe_bad_password(!password.is_empty()))?;
1873                    if !cont {
1874                        return Ok(false);
1875                    }
1876                }
1877            } else {
1878                let mut empty_reader = AsyncStdRead::new([0u8; 0].as_slice());
1879                if !each(file, &mut empty_reader).await? {
1880                    return Ok(false);
1881                }
1882            }
1883        }
1884        Ok(true)
1885    }
1886}