1#[cfg(not(target_arch = "wasm32"))]
2use std::path::Path;
3use std::{
4 collections::HashMap,
5 future::Future,
6 io,
7 num::NonZeroUsize,
8 pin::Pin,
9 sync::{Arc, Mutex},
10 task::{Context, Poll},
11};
12
13use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Cursor, SeekFrom};
14
15use async_fs as afs;
16use crc32fast::Hasher;
17use lzma_rust2::filter::bcj2::Bcj2Reader;
18
19use crate::{
20 Password,
21 archive::*,
22 bitset::BitSet,
23 block::*,
24 decoder::{AsyncStdRead, add_decoder},
25 error::Error,
26};
27
28const MAX_MEM_LIMIT_KB: usize = usize::MAX / 1024;
29
30pub struct BoundedReader<R: AsyncRead + Unpin> {
31 inner: R,
32 remain: usize,
33}
34
35impl<R: AsyncRead + Unpin> BoundedReader<R> {
36 pub fn new(inner: R, max_size: usize) -> Self {
37 Self {
38 inner,
39 remain: max_size,
40 }
41 }
42}
43
44impl<R: AsyncRead + Unpin> AsyncRead for BoundedReader<R> {
45 fn poll_read(
46 mut self: Pin<&mut Self>,
47 cx: &mut Context<'_>,
48 buf: &mut [u8],
49 ) -> Poll<std::io::Result<usize>> {
50 if self.remain == 0 {
51 return Poll::Ready(Ok(0));
52 }
53 let bound = buf.len().min(self.remain);
54 let poll = Pin::new(&mut self.inner).poll_read(cx, &mut buf[..bound]);
55 if let Poll::Ready(Ok(size)) = &poll {
56 self.remain -= *size;
57 }
58 poll
59 }
60}
61
62#[derive(Debug)]
65pub(crate) struct SharedBoundedReader<'a, R> {
66 inner: Arc<Mutex<&'a mut R>>,
67 cur: u64,
68 bounds: (u64, u64),
69}
70
71impl<'a, R> Clone for SharedBoundedReader<'a, R> {
72 fn clone(&self) -> Self {
73 Self {
74 inner: Arc::clone(&self.inner),
75 cur: self.cur,
76 bounds: self.bounds,
77 }
78 }
79}
80
81impl<'a, R: AsyncRead + AsyncSeek + Unpin> AsyncRead for SharedBoundedReader<'a, R> {
82 fn poll_read(
83 mut self: Pin<&mut Self>,
84 cx: &mut Context<'_>,
85 buf: &mut [u8],
86 ) -> Poll<std::io::Result<usize>> {
87 if self.cur >= self.bounds.1 {
88 return Poll::Ready(Ok(0));
89 }
90 let cur = self.cur;
91 let mut inner = self.inner.lock().unwrap();
92 match Pin::new(&mut *inner).poll_seek(cx, SeekFrom::Start(cur)) {
93 Poll::Pending => return Poll::Pending,
94 Poll::Ready(Ok(_)) => {}
95 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
96 }
97 let bound = buf.len().min((self.bounds.1 - cur) as usize);
98 let poll = Pin::new(&mut *inner).poll_read(cx, &mut buf[..bound]);
99 drop(inner);
100 match poll {
101 Poll::Pending => Poll::Pending,
102 Poll::Ready(Ok(size)) => {
103 self.cur += size as u64;
104 Poll::Ready(Ok(size))
105 }
106 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
107 }
108 }
109}
110
111impl<'a, R: AsyncRead + AsyncSeek + Unpin> SharedBoundedReader<'a, R> {
112 fn new(inner: Arc<Mutex<&'a mut R>>, bounds: (u64, u64)) -> Self {
113 Self {
114 inner,
115 cur: bounds.0,
116 bounds,
117 }
118 }
119}
120
121struct Crc32VerifyingReader<R> {
122 inner: R,
123 crc_digest: Hasher,
124 expected_value: u64,
125 remaining: i64,
126}
127
128impl<R> Crc32VerifyingReader<R> {
129 fn new(inner: R, remaining: usize, expected_value: u64) -> Self {
130 Self {
131 inner,
132 crc_digest: Hasher::new(),
133 expected_value,
134 remaining: remaining as i64,
135 }
136 }
137}
138
139impl<R: AsyncRead + Unpin> AsyncRead for Crc32VerifyingReader<R> {
142 fn poll_read(
143 mut self: Pin<&mut Self>,
144 cx: &mut Context<'_>,
145 buf: &mut [u8],
146 ) -> Poll<std::io::Result<usize>> {
147 if self.remaining <= 0 {
148 return Poll::Ready(Ok(0));
149 }
150 let poll = Pin::new(&mut self.inner).poll_read(cx, buf);
151 if let Poll::Ready(Ok(size)) = poll {
152 if size > 0 {
153 self.remaining -= size as i64;
154 self.crc_digest.update(&buf[..size]);
155 }
156 if self.remaining <= 0 {
157 let d = std::mem::replace(&mut self.crc_digest, Hasher::new()).finalize();
158 if d as u64 != self.expected_value {
159 return Poll::Ready(Err(std::io::Error::other(
160 Error::ChecksumVerificationFailed,
161 )));
162 }
163 }
164 }
165 poll
166 }
167}
168
169impl Archive {
170 #[cfg(not(target_arch = "wasm32"))]
174 pub async fn open(path: impl AsRef<Path>) -> Result<Archive, Error> {
175 use futures_lite::io::Cursor;
176
177 let data = afs::read(path.as_ref())
178 .await
179 .map_err(|e| Error::file_open(e, path.as_ref().to_string_lossy().to_string()))?;
180 let mut cursor = Cursor::new(data);
181 Self::read(&mut cursor, &Password::empty()).await
182 }
183
184 #[cfg(not(target_arch = "wasm32"))]
188 pub async fn open_with_password(
189 path: impl AsRef<Path>,
190 password: &Password,
191 ) -> Result<Archive, Error> {
192 let data = afs::read(path.as_ref())
193 .await
194 .map_err(|e| Error::file_open(e, path.as_ref().to_string_lossy().to_string()))?;
195 let mut cursor = Cursor::new(data);
196 Self::read(&mut cursor, password).await
197 }
198
199 pub(crate) async fn read<R: AsyncRead + AsyncSeek + Unpin + Send>(
221 reader: &mut R,
222 password: &Password,
223 ) -> Result<Archive, Error> {
224 let reader_len = AsyncSeekExt::seek(reader, SeekFrom::End(0)).await?;
225 AsyncSeekExt::seek(reader, SeekFrom::Start(0)).await?;
226
227 let mut signature = [0; 6];
228 AsyncReadExt::read_exact(reader, &mut signature).await?;
229 if signature != SEVEN_Z_SIGNATURE {
230 return Err(Error::BadSignature(signature));
231 }
232 let mut versions = [0; 2];
233 AsyncReadExt::read_exact(reader, &mut versions).await?;
234 let version_major = versions[0];
235 let version_minor = versions[1];
236 if version_major != 0 {
237 return Err(Error::UnsupportedVersion {
238 major: version_major,
239 minor: version_minor,
240 });
241 }
242
243 let start_header_crc = {
244 let mut buf = [0u8; 4];
245 AsyncReadExt::read_exact(reader, &mut buf).await?;
246 u32::from_le_bytes(buf)
247 };
248
249 let header_valid = if start_header_crc == 0 {
250 let current_position = reader.seek(SeekFrom::Current(0)).await?;
251 let mut buf = [0; 20];
252 AsyncReadExt::read_exact(reader, &mut buf).await?;
253 AsyncSeekExt::seek(reader, SeekFrom::Start(current_position)).await?;
254 buf.iter().any(|a| *a != 0)
255 } else {
256 true
257 };
258 if header_valid {
259 let start_header = Self::read_start_header(reader, start_header_crc).await?;
260 Self::init_archive(reader, start_header, password, true, 1).await
261 } else {
262 Self::try_to_locale_end_header(reader, reader_len, password, 1).await
263 }
264 }
265
266 async fn read_start_header<R: AsyncRead + Unpin>(
267 reader: &mut R,
268 start_header_crc: u32,
269 ) -> Result<StartHeader, Error> {
270 let mut buf = [0; 20];
271 AsyncReadExt::read_exact(reader, &mut buf).await?;
272 let crc32 = crc32fast::hash(&buf);
273 if crc32 != start_header_crc {
274 return Err(Error::ChecksumVerificationFailed);
275 }
276 let offset = u64::from_le_bytes(buf[0..8].try_into().unwrap());
277 let size = u64::from_le_bytes(buf[8..16].try_into().unwrap());
278 let crc = u32::from_le_bytes(buf[16..20].try_into().unwrap());
279 Ok(StartHeader {
280 next_header_offset: offset,
281 next_header_size: size,
282 next_header_crc: crc as u64,
283 })
284 }
285
286 async fn read_header<R: AsyncRead + AsyncSeek + Unpin>(
287 header: &mut R,
288 archive: &mut Archive,
289 ) -> Result<(), Error> {
290 let mut nid = {
291 let mut b = [0u8; 1];
292 AsyncReadExt::read_exact(header, &mut b).await?;
293 b[0]
294 };
295 if nid == K_ARCHIVE_PROPERTIES {
296 Self::read_archive_properties(header).await?;
297 nid = {
298 let mut b = [0u8; 1];
299 AsyncReadExt::read_exact(header, &mut b).await?;
300 b[0]
301 };
302 }
303
304 if nid == K_ADDITIONAL_STREAMS_INFO {
305 return Err(Error::other("Additional streams unsupported"));
306 }
307 if nid == K_MAIN_STREAMS_INFO {
308 Self::read_streams_info(header, archive).await?;
309 nid = {
310 let mut b = [0u8; 1];
311 AsyncReadExt::read_exact(header, &mut b).await?;
312 b[0]
313 };
314 }
315 if nid == K_FILES_INFO {
316 Self::read_files_info(header, archive).await?;
317 nid = {
318 let mut b = [0u8; 1];
319 AsyncReadExt::read_exact(header, &mut b).await?;
320 b[0]
321 };
322 }
323 if nid != K_END {
324 return Err(Error::BadTerminatedHeader(nid));
325 }
326
327 Ok(())
328 }
329
330 async fn read_archive_properties<R: AsyncRead + AsyncSeek + Unpin>(
331 header: &mut R,
332 ) -> Result<(), Error> {
333 let mut nid = {
334 let mut b = [0u8; 1];
335 AsyncReadExt::read_exact(header, &mut b).await?;
336 b[0]
337 };
338 while nid != K_END {
339 let property_size = read_variable_usize(header, "propertySize").await?;
340 AsyncSeekExt::seek(header, SeekFrom::Current(property_size as i64)).await?;
341 nid = {
342 let mut b = [0u8; 1];
343 AsyncReadExt::read_exact(header, &mut b).await?;
344 b[0]
345 };
346 }
347 Ok(())
348 }
349
350 async fn try_to_locale_end_header<R: AsyncRead + AsyncSeek + Unpin + Send>(
351 reader: &mut R,
352 reader_len: u64,
353 password: &Password,
354 thread_count: u32,
355 ) -> Result<Self, Error> {
356 let search_limit = 1024 * 1024;
357 let prev_data_size = reader.seek(SeekFrom::Current(0)).await? + 20;
358 let size = reader_len;
359 let cur_pos = reader.seek(SeekFrom::Current(0)).await?;
360 let min_pos = if cur_pos + search_limit > size {
361 cur_pos
362 } else {
363 size - search_limit
364 };
365 let mut pos = reader_len - 1;
366 while pos > min_pos {
367 pos -= 1;
368
369 AsyncSeekExt::seek(reader, SeekFrom::Start(pos)).await?;
370 let nid = {
371 let mut buf = [0u8; 1];
372 AsyncReadExt::read_exact(reader, &mut buf).await?;
373 buf[0]
374 };
375 if nid == K_ENCODED_HEADER || nid == K_HEADER {
376 let start_header = StartHeader {
377 next_header_offset: pos - prev_data_size,
378 next_header_size: reader_len - pos,
379 next_header_crc: 0,
380 };
381 let result =
382 Self::init_archive(reader, start_header, password, false, thread_count).await?;
383
384 if !result.files.is_empty() {
385 return Ok(result);
386 }
387 }
388 }
389 Err(Error::other(
390 "Start header corrupt and unable to guess end header",
391 ))
392 }
393
394 async fn init_archive<R: AsyncRead + AsyncSeek + Unpin + Send>(
395 reader: &mut R,
396 start_header: StartHeader,
397 password: &Password,
398 verify_crc: bool,
399 thread_count: u32,
400 ) -> Result<Self, Error> {
401 if start_header.next_header_size > usize::MAX as u64 {
402 return Err(Error::other(format!(
403 "Cannot handle next_header_size {}",
404 start_header.next_header_size
405 )));
406 }
407
408 let next_header_size_int = start_header.next_header_size as usize;
409
410 AsyncSeekExt::seek(
411 reader,
412 SeekFrom::Start(SIGNATURE_HEADER_SIZE + start_header.next_header_offset),
413 )
414 .await?;
415
416 let mut buf = vec![0; next_header_size_int];
417 AsyncReadExt::read_exact(reader, &mut buf).await?;
418 if verify_crc && crc32fast::hash(&buf) as u64 != start_header.next_header_crc {
419 return Err(Error::NextHeaderCrcMismatch);
420 }
421
422 let mut archive = Archive::default();
423 let nid = buf.first().copied().unwrap_or(0);
424 if nid == K_ENCODED_HEADER {
425 let mut cursor = Cursor::new(&buf[1..]);
426 let (mut out_reader, buf_size) = Self::read_encoded_header(
427 &mut cursor,
428 reader,
429 &mut archive,
430 password,
431 thread_count,
432 )
433 .await?;
434 buf.clear();
435 buf.resize(buf_size, 0);
436 AsyncReadExt::read_exact(&mut out_reader, &mut buf)
437 .await
438 .map_err(|e| Error::bad_password(e, !password.is_empty()))?;
439 archive = Archive::default();
440 }
441 let nid = buf.first().copied().unwrap_or(0);
442 if nid == K_HEADER {
443 let mut header = Cursor::new(&buf[1..]);
444 Self::read_header(&mut header, &mut archive).await?;
445 } else {
446 return Err(Error::other("Broken or unsupported archive: no Header"));
447 }
448
449 archive.is_solid = archive
450 .blocks
451 .iter()
452 .any(|block| block.num_unpack_sub_streams > 1);
453
454 Ok(archive)
455 }
456
457 async fn read_encoded_header<'r, RI: 'r + AsyncRead + AsyncSeek + Unpin + Send>(
458 header: &mut (impl AsyncRead + Unpin),
459 reader: &'r mut RI,
460 archive: &mut Archive,
461 password: &Password,
462 thread_count: u32,
463 ) -> Result<(Box<dyn AsyncRead + Unpin + Send + 'r>, usize), Error> {
464 Self::read_streams_info(header, archive).await?;
465 let block = archive
466 .blocks
467 .first()
468 .ok_or(Error::other("no blocks, can't read encoded header"))?;
469 let first_pack_stream_index = 0;
470 let block_offset = SIGNATURE_HEADER_SIZE + archive.pack_pos;
471 if archive.pack_sizes.is_empty() {
472 return Err(Error::other("no packed streams, can't read encoded header"));
473 }
474
475 AsyncSeekExt::seek(reader, SeekFrom::Start(block_offset)).await?;
476 let coder_len = block.coders.len();
477 let unpack_size = block.get_unpack_size() as usize;
478 let pack_size = archive.pack_sizes[first_pack_stream_index] as usize;
479 let mut decoder: Box<dyn AsyncRead + Unpin + Send> =
480 Box::new(BoundedReader::new(reader, pack_size));
481 let mut decoder = if coder_len > 0 {
482 for (index, coder) in block.ordered_coder_iter() {
483 if coder.num_in_streams != 1 || coder.num_out_streams != 1 {
484 return Err(Error::other(
485 "Multi input/output stream coders are not yet supported",
486 ));
487 }
488 let next = add_decoder(
489 decoder,
490 block.get_unpack_size_at_index(index) as usize,
491 coder,
492 password,
493 MAX_MEM_LIMIT_KB,
494 thread_count,
495 )
496 .await?;
497 decoder = Box::new(next);
498 }
499 decoder
500 } else {
501 decoder
502 };
503 if block.has_crc {
504 decoder = Box::new(Crc32VerifyingReader::new(decoder, unpack_size, block.crc));
505 }
506
507 Ok((decoder, unpack_size))
508 }
509
510 async fn read_streams_info<R: AsyncRead + Unpin>(
511 header: &mut R,
512 archive: &mut Archive,
513 ) -> Result<(), Error> {
514 let mut nid = {
515 let mut b = [0u8; 1];
516 AsyncReadExt::read_exact(header, &mut b).await?;
517 b[0]
518 };
519 if nid == K_PACK_INFO {
520 Self::read_pack_info(header, archive).await?;
521 nid = {
522 let mut b = [0u8; 1];
523 AsyncReadExt::read_exact(header, &mut b).await?;
524 b[0]
525 };
526 }
527
528 if nid == K_UNPACK_INFO {
529 Self::read_unpack_info(header, archive).await?;
530 nid = {
531 let mut b = [0u8; 1];
532 AsyncReadExt::read_exact(header, &mut b).await?;
533 b[0]
534 };
535 } else {
536 archive.blocks.clear();
537 }
538 if nid == K_SUB_STREAMS_INFO {
539 Self::read_sub_streams_info(header, archive).await?;
540 nid = {
541 let mut b = [0u8; 1];
542 AsyncReadExt::read_exact(header, &mut b).await?;
543 b[0]
544 };
545 }
546 if nid != K_END {
547 return Err(Error::BadTerminatedStreamsInfo(nid));
548 }
549
550 Ok(())
551 }
552
553 async fn read_files_info<R: AsyncRead + AsyncSeek + Unpin>(
554 header: &mut R,
555 archive: &mut Archive,
556 ) -> Result<(), Error> {
557 let num_files = read_variable_usize(header, "num files").await?;
558 let mut files: Vec<ArchiveEntry> = vec![Default::default(); num_files];
559
560 let mut is_empty_stream: Option<BitSet> = None;
561 let mut is_empty_file: Option<BitSet> = None;
562 let mut is_anti: Option<BitSet> = None;
563 loop {
564 let prop_type = {
565 let mut b = [0u8; 1];
566 AsyncReadExt::read_exact(header, &mut b).await?;
567 b[0]
568 };
569 if prop_type == 0 {
570 break;
571 }
572 let size = read_variable_u64(header).await?;
573 match prop_type {
574 K_EMPTY_STREAM => {
575 is_empty_stream = Some(read_bits(header, num_files).await?);
576 }
577 K_EMPTY_FILE => {
578 let n = if let Some(s) = &is_empty_stream {
579 s.len()
580 } else {
581 return Err(Error::other(
582 "Header format error: kEmptyStream must appear before kEmptyFile",
583 ));
584 };
585 is_empty_file = Some(read_bits(header, n).await?);
586 }
587 K_ANTI => {
588 let n = if let Some(s) = is_empty_stream.as_ref() {
589 s.len()
590 } else {
591 return Err(Error::other(
592 "Header format error: kEmptyStream must appear before kEmptyFile",
593 ));
594 };
595 is_anti = Some(read_bits(header, n).await?);
596 }
597 K_NAME => {
598 let external = {
599 let mut b = [0u8; 1];
600 AsyncReadExt::read_exact(header, &mut b).await?;
601 b[0]
602 };
603 if external != 0 {
604 return Err(Error::other("Not implemented:external != 0"));
605 }
606 if (size - 1) & 1 != 0 {
607 return Err(Error::other("file names length invalid"));
608 }
609
610 let size = assert_usize(size, "file names length")?;
611 let mut next_file = 0;
612 let mut read_bytes = 0usize;
613 let mut cache: Vec<u16> = Vec::with_capacity(16);
614 let mut buf2 = [0u8; 2];
615 while read_bytes < size - 1 {
616 AsyncReadExt::read_exact(header, &mut buf2).await?;
617 read_bytes += 2;
618 let u = u16::from_le_bytes(buf2);
619 if u == 0 {
620 let s = String::from_utf16(&cache)
621 .map_err(|e| Error::other(e.to_string()))?;
622 files[next_file].name = s;
623 next_file += 1;
624 cache.clear();
625 } else {
626 cache.push(u);
627 }
628 }
629 if next_file != files.len() {
630 return Err(Error::other("Error parsing file names"));
631 }
632 }
633 K_C_TIME => {
634 let times_defined = read_all_or_bits(header, num_files).await?;
635 let external = {
636 let mut b = [0u8; 1];
637 AsyncReadExt::read_exact(header, &mut b).await?;
638 b[0]
639 };
640 if external != 0 {
641 return Err(Error::other(format!(
642 "kCTime Unimplemented:external={external}"
643 )));
644 }
645 for (i, file) in files.iter_mut().enumerate() {
646 file.has_creation_date = times_defined.contains(i);
647 if file.has_creation_date {
648 let mut b8 = [0u8; 8];
649 AsyncReadExt::read_exact(header, &mut b8).await?;
650 file.creation_date = u64::from_le_bytes(b8).into();
651 }
652 }
653 }
654 K_A_TIME => {
655 let times_defined = read_all_or_bits(header, num_files).await?;
656 let external = {
657 let mut b = [0u8; 1];
658 AsyncReadExt::read_exact(header, &mut b).await?;
659 b[0]
660 };
661 if external != 0 {
662 return Err(Error::other(format!(
663 "kATime Unimplemented:external={external}"
664 )));
665 }
666 for (i, file) in files.iter_mut().enumerate() {
667 file.has_access_date = times_defined.contains(i);
668 if file.has_access_date {
669 let mut b8 = [0u8; 8];
670 AsyncReadExt::read_exact(header, &mut b8).await?;
671 file.access_date = u64::from_le_bytes(b8).into();
672 }
673 }
674 }
675 K_M_TIME => {
676 let times_defined = read_all_or_bits(header, num_files).await?;
677 let external = {
678 let mut b = [0u8; 1];
679 AsyncReadExt::read_exact(header, &mut b).await?;
680 b[0]
681 };
682 if external != 0 {
683 return Err(Error::other(format!(
684 "kMTime Unimplemented:external={external}"
685 )));
686 }
687 for (i, file) in files.iter_mut().enumerate() {
688 file.has_last_modified_date = times_defined.contains(i);
689 if file.has_last_modified_date {
690 let mut b8 = [0u8; 8];
691 AsyncReadExt::read_exact(header, &mut b8).await?;
692 file.last_modified_date = u64::from_le_bytes(b8).into();
693 }
694 }
695 }
696 K_WIN_ATTRIBUTES => {
697 let times_defined = read_all_or_bits(header, num_files).await?;
698 let external = {
699 let mut b = [0u8; 1];
700 AsyncReadExt::read_exact(header, &mut b).await?;
701 b[0]
702 };
703 if external != 0 {
704 return Err(Error::other(format!(
705 "kWinAttributes Unimplemented:external={external}"
706 )));
707 }
708 for (i, file) in files.iter_mut().enumerate() {
709 file.has_windows_attributes = times_defined.contains(i);
710 if file.has_windows_attributes {
711 let mut b4 = [0u8; 4];
712 AsyncReadExt::read_exact(header, &mut b4).await?;
713 file.windows_attributes = u32::from_le_bytes(b4);
714 }
715 }
716 }
717 K_START_POS => return Err(Error::other("kStartPos is unsupported, please report")),
718 K_DUMMY => {
719 AsyncSeekExt::seek(header, SeekFrom::Current(size as i64)).await?;
720 }
721 _ => {
722 AsyncSeekExt::seek(header, SeekFrom::Current(size as i64)).await?;
723 }
724 };
725 }
726
727 let mut non_empty_file_counter = 0;
728 let mut empty_file_counter = 0;
729 for (i, file) in files.iter_mut().enumerate() {
730 file.has_stream = is_empty_stream
731 .as_ref()
732 .map(|s| !s.contains(i))
733 .unwrap_or(true);
734 if file.has_stream {
735 let sub_stream_info = if let Some(s) = archive.sub_streams_info.as_ref() {
736 s
737 } else {
738 return Err(Error::other(
739 "Archive contains file with streams but no subStreamsInfo",
740 ));
741 };
742 file.is_directory = false;
743 file.is_anti_item = false;
744 file.has_crc = sub_stream_info.has_crc.contains(non_empty_file_counter);
745 file.crc = sub_stream_info.crcs[non_empty_file_counter];
746 file.size = sub_stream_info.unpack_sizes[non_empty_file_counter];
747 non_empty_file_counter += 1;
748 } else {
749 file.is_directory = if let Some(s) = &is_empty_file {
750 !s.contains(empty_file_counter)
751 } else {
752 true
753 };
754 file.is_anti_item = is_anti
755 .as_ref()
756 .map(|s| s.contains(empty_file_counter))
757 .unwrap_or(false);
758 file.has_crc = false;
759 file.size = 0;
760 empty_file_counter += 1;
761 }
762 }
763 archive.files = files;
764
765 Self::calculate_stream_map(archive)?;
766 Ok(())
767 }
768
769 fn calculate_stream_map(archive: &mut Archive) -> Result<(), Error> {
770 let mut stream_map = StreamMap::default();
771
772 let mut next_block_pack_stream_index = 0;
773 let num_blocks = archive.blocks.len();
774 stream_map.block_first_pack_stream_index = vec![0; num_blocks];
775 for i in 0..num_blocks {
776 stream_map.block_first_pack_stream_index[i] = next_block_pack_stream_index;
777 next_block_pack_stream_index += archive.blocks[i].packed_streams.len();
778 }
779
780 let mut next_pack_stream_offset = 0;
781 let num_pack_sizes = archive.pack_sizes.len();
782 stream_map.pack_stream_offsets = vec![0; num_pack_sizes];
783 for i in 0..num_pack_sizes {
784 stream_map.pack_stream_offsets[i] = next_pack_stream_offset;
785 next_pack_stream_offset += archive.pack_sizes[i];
786 }
787
788 stream_map.block_first_file_index = vec![0; num_blocks];
789 stream_map.file_block_index = vec![None; archive.files.len()];
790 let mut next_block_index = 0;
791 let mut next_block_unpack_stream_index = 0;
792 for i in 0..archive.files.len() {
793 if !archive.files[i].has_stream && next_block_unpack_stream_index == 0 {
794 stream_map.file_block_index[i] = None;
795 continue;
796 }
797 if next_block_unpack_stream_index == 0 {
798 while next_block_index < archive.blocks.len() {
799 stream_map.block_first_file_index[next_block_index] = i;
800 if archive.blocks[next_block_index].num_unpack_sub_streams > 0 {
801 break;
802 }
803 next_block_index += 1;
804 }
805 if next_block_index >= archive.blocks.len() {
806 return Err(Error::other("Too few blocks in archive"));
807 }
808 }
809 stream_map.file_block_index[i] = Some(next_block_index);
810 if !archive.files[i].has_stream {
811 continue;
812 }
813
814 if stream_map.block_first_file_index[next_block_index] == i {
816 let first_pack_stream_index =
817 stream_map.block_first_pack_stream_index[next_block_index];
818 let pack_size = archive.pack_sizes[first_pack_stream_index];
819
820 archive.files[i].compressed_size = pack_size;
821 }
822
823 next_block_unpack_stream_index += 1;
824 if next_block_unpack_stream_index
825 >= archive.blocks[next_block_index].num_unpack_sub_streams
826 {
827 next_block_index += 1;
828 next_block_unpack_stream_index = 0;
829 }
830 }
831
832 archive.stream_map = stream_map;
833 Ok(())
834 }
835
836 async fn read_pack_info<R: AsyncRead + Unpin>(
837 header: &mut R,
838 archive: &mut Archive,
839 ) -> Result<(), Error> {
840 archive.pack_pos = read_variable_u64(header).await?;
841 let num_pack_streams = read_variable_usize(header, "num pack streams").await?;
842 let mut nid = {
843 let mut b = [0u8; 1];
844 AsyncReadExt::read_exact(header, &mut b).await?;
845 b[0]
846 };
847 if nid == K_SIZE {
848 archive.pack_sizes = vec![0u64; num_pack_streams];
849 for i in 0..archive.pack_sizes.len() {
850 archive.pack_sizes[i] = read_variable_u64(header).await?;
851 }
852 nid = {
853 let mut b = [0u8; 1];
854 AsyncReadExt::read_exact(header, &mut b).await?;
855 b[0]
856 };
857 }
858
859 if nid == K_CRC {
860 archive.pack_crcs_defined = read_all_or_bits(header, num_pack_streams).await?;
861 archive.pack_crcs = vec![0; num_pack_streams];
862 for i in 0..num_pack_streams {
863 if archive.pack_crcs_defined.contains(i) {
864 let mut b4 = [0u8; 4];
865 AsyncReadExt::read_exact(header, &mut b4).await?;
866 archive.pack_crcs[i] = u32::from_le_bytes(b4) as u64;
867 }
868 }
869 nid = {
870 let mut b = [0u8; 1];
871 AsyncReadExt::read_exact(header, &mut b).await?;
872 b[0]
873 };
874 }
875
876 if nid != K_END {
877 return Err(Error::BadTerminatedPackInfo(nid));
878 }
879
880 Ok(())
881 }
882 async fn read_unpack_info<R: AsyncRead + Unpin>(
883 header: &mut R,
884 archive: &mut Archive,
885 ) -> Result<(), Error> {
886 let nid = {
887 let mut b = [0u8; 1];
888 AsyncReadExt::read_exact(header, &mut b).await?;
889 b[0]
890 };
891 if nid != K_FOLDER {
892 return Err(Error::other(format!("Expected kFolder, got {nid}")));
893 }
894 let num_blocks = read_variable_usize(header, "num blocks").await?;
895
896 archive.blocks.reserve_exact(num_blocks);
897 let external = {
898 let mut b = [0u8; 1];
899 AsyncReadExt::read_exact(header, &mut b).await?;
900 b[0]
901 };
902 if external != 0 {
903 return Err(Error::ExternalUnsupported);
904 }
905
906 for _ in 0..num_blocks {
907 archive.blocks.push(Self::read_block(header).await?);
908 }
909
910 let nid = {
911 let mut b = [0u8; 1];
912 AsyncReadExt::read_exact(header, &mut b).await?;
913 b[0]
914 };
915 if nid != K_CODERS_UNPACK_SIZE {
916 return Err(Error::other(format!(
917 "Expected kCodersUnpackSize, got {nid}"
918 )));
919 }
920
921 for block in archive.blocks.iter_mut() {
922 let tos = block.total_output_streams;
923 block.unpack_sizes.reserve_exact(tos);
924 for _ in 0..tos {
925 block.unpack_sizes.push(read_variable_u64(header).await?);
926 }
927 }
928
929 let mut nid = {
930 let mut b = [0u8; 1];
931 AsyncReadExt::read_exact(header, &mut b).await?;
932 b[0]
933 };
934 if nid == K_CRC {
935 let crcs_defined = read_all_or_bits(header, num_blocks).await?;
936 for i in 0..num_blocks {
937 if crcs_defined.contains(i) {
938 archive.blocks[i].has_crc = true;
939 let mut b4 = [0u8; 4];
940 AsyncReadExt::read_exact(header, &mut b4).await?;
941 archive.blocks[i].crc = u32::from_le_bytes(b4) as u64;
942 } else {
943 archive.blocks[i].has_crc = false;
944 }
945 }
946 nid = {
947 let mut b = [0u8; 1];
948 AsyncReadExt::read_exact(header, &mut b).await?;
949 b[0]
950 };
951 }
952 if nid != K_END {
953 return Err(Error::BadTerminatedUnpackInfo);
954 }
955
956 Ok(())
957 }
958
959 async fn read_sub_streams_info<R: AsyncRead + Unpin>(
960 header: &mut R,
961 archive: &mut Archive,
962 ) -> Result<(), Error> {
963 for block in archive.blocks.iter_mut() {
964 block.num_unpack_sub_streams = 1;
965 }
966 let mut total_unpack_streams = archive.blocks.len();
967
968 let mut nid = {
969 let mut b = [0u8; 1];
970 AsyncReadExt::read_exact(header, &mut b).await?;
971 b[0]
972 };
973 if nid == K_NUM_UNPACK_STREAM {
974 total_unpack_streams = 0;
975 for block in archive.blocks.iter_mut() {
976 let num_streams = read_variable_usize(header, "numStreams").await?;
977 block.num_unpack_sub_streams = num_streams;
978 total_unpack_streams += num_streams;
979 }
980 nid = {
981 let mut b = [0u8; 1];
982 AsyncReadExt::read_exact(header, &mut b).await?;
983 b[0]
984 };
985 }
986
987 let mut sub_streams_info = SubStreamsInfo::default();
988 sub_streams_info
989 .unpack_sizes
990 .resize(total_unpack_streams, Default::default());
991 sub_streams_info
992 .has_crc
993 .reserve_len_exact(total_unpack_streams);
994 sub_streams_info.crcs = vec![0; total_unpack_streams];
995
996 let mut next_unpack_stream = 0;
997 for block in archive.blocks.iter() {
998 if block.num_unpack_sub_streams == 0 {
999 continue;
1000 }
1001 let mut sum = 0;
1002 if nid == K_SIZE {
1003 for _i in 0..block.num_unpack_sub_streams - 1 {
1004 let size = read_variable_u64(header).await?;
1005 sub_streams_info.unpack_sizes[next_unpack_stream] = size;
1006 next_unpack_stream += 1;
1007 sum += size;
1008 }
1009 }
1010 if sum > block.get_unpack_size() {
1011 return Err(Error::other(
1012 "sum of unpack sizes of block exceeds total unpack size",
1013 ));
1014 }
1015 sub_streams_info.unpack_sizes[next_unpack_stream] = block.get_unpack_size() - sum;
1017 next_unpack_stream += 1;
1018 }
1019 if nid == K_SIZE {
1020 nid = {
1021 let mut b = [0u8; 1];
1022 AsyncReadExt::read_exact(header, &mut b).await?;
1023 b[0]
1024 };
1025 }
1026
1027 let mut num_digests = 0;
1028 for block in archive.blocks.iter() {
1029 if block.num_unpack_sub_streams != 1 || !block.has_crc {
1030 num_digests += block.num_unpack_sub_streams;
1031 }
1032 }
1033
1034 if nid == K_CRC {
1035 let has_missing_crc = read_all_or_bits(header, num_digests).await?;
1036 let mut missing_crcs = vec![0; num_digests];
1037 for (i, missing_crc) in missing_crcs.iter_mut().enumerate() {
1038 if has_missing_crc.contains(i) {
1039 let mut b4 = [0u8; 4];
1040 AsyncReadExt::read_exact(header, &mut b4).await?;
1041 *missing_crc = u32::from_le_bytes(b4) as u64;
1042 }
1043 }
1044 let mut next_crc = 0;
1045 let mut next_missing_crc = 0;
1046 for block in archive.blocks.iter() {
1047 if block.num_unpack_sub_streams == 1 && block.has_crc {
1048 sub_streams_info.has_crc.insert(next_crc);
1049 sub_streams_info.crcs[next_crc] = block.crc;
1050 next_crc += 1;
1051 } else {
1052 for _i in 0..block.num_unpack_sub_streams {
1053 if has_missing_crc.contains(next_missing_crc) {
1054 sub_streams_info.has_crc.insert(next_crc);
1055 } else {
1056 sub_streams_info.has_crc.remove(next_crc);
1057 }
1058 sub_streams_info.crcs[next_crc] = missing_crcs[next_missing_crc];
1059 next_crc += 1;
1060 next_missing_crc += 1;
1061 }
1062 }
1063 }
1064
1065 nid = {
1066 let mut b = [0u8; 1];
1067 AsyncReadExt::read_exact(header, &mut b).await?;
1068 b[0]
1069 };
1070 }
1071
1072 if nid != K_END {
1073 return Err(Error::BadTerminatedSubStreamsInfo);
1074 }
1075
1076 archive.sub_streams_info = Some(sub_streams_info);
1077 Ok(())
1078 }
1079
1080 async fn read_block<R: AsyncRead + Unpin>(header: &mut R) -> Result<Block, Error> {
1081 let mut block = Block::default();
1082
1083 let num_coders = read_variable_usize(header, "num coders").await?;
1084 let mut coders = Vec::with_capacity(num_coders);
1085 let mut total_in_streams = 0;
1086 let mut total_out_streams = 0;
1087 for _i in 0..num_coders {
1088 let mut coder = Coder::default();
1089 let bits = {
1090 let mut b = [0u8; 1];
1091 AsyncReadExt::read_exact(header, &mut b).await?;
1092 b[0]
1093 };
1094 let id_size = bits & 0xF;
1095 let is_simple = (bits & 0x10) == 0;
1096 let has_attributes = (bits & 0x20) != 0;
1097 let more_alternative_methods = (bits & 0x80) != 0;
1098
1099 coder.id_size = id_size as usize;
1100
1101 AsyncReadExt::read_exact(header, coder.decompression_method_id_mut()).await?;
1102 if is_simple {
1103 coder.num_in_streams = 1;
1104 coder.num_out_streams = 1;
1105 } else {
1106 coder.num_in_streams = read_variable_u64(header).await?;
1107 coder.num_out_streams = read_variable_u64(header).await?;
1108 }
1109 total_in_streams += coder.num_in_streams;
1110 total_out_streams += coder.num_out_streams;
1111 if has_attributes {
1112 let properties_size = read_variable_usize(header, "properties size").await?;
1113 let mut props = vec![0u8; properties_size];
1114 AsyncReadExt::read_exact(header, &mut props).await?;
1115 coder.properties = props;
1116 }
1117 coders.push(coder);
1118 if more_alternative_methods {
1120 return Err(Error::other(
1121 "Alternative methods are unsupported, please report. The reference implementation doesn't support them either.",
1122 ));
1123 }
1124 }
1125 block.coders = coders;
1126 let total_in_streams = assert_usize(total_in_streams, "totalInStreams")?;
1127 let total_out_streams = assert_usize(total_out_streams, "totalOutStreams")?;
1128 block.total_input_streams = total_in_streams;
1129 block.total_output_streams = total_out_streams;
1130
1131 if total_out_streams == 0 {
1132 return Err(Error::other("Total output streams can't be 0"));
1133 }
1134 let num_bind_pairs = total_out_streams - 1;
1135 let mut bind_pairs = Vec::with_capacity(num_bind_pairs);
1136 for _ in 0..num_bind_pairs {
1137 let bp = BindPair {
1138 in_index: read_variable_u64(header).await?,
1139 out_index: read_variable_u64(header).await?,
1140 };
1141 bind_pairs.push(bp);
1142 }
1143 block.bind_pairs = bind_pairs;
1144
1145 if total_in_streams < num_bind_pairs {
1146 return Err(Error::other(
1147 "Total input streams can't be less than the number of bind pairs",
1148 ));
1149 }
1150 let num_packed_streams = total_in_streams - num_bind_pairs;
1151 let mut packed_streams = vec![0; num_packed_streams];
1152 if num_packed_streams == 1 {
1153 let mut index = u64::MAX;
1154 for i in 0..total_in_streams {
1155 if block.find_bind_pair_for_in_stream(i as u64).is_none() {
1156 index = i as u64;
1157 break;
1158 }
1159 }
1160 if index == u64::MAX {
1161 return Err(Error::other("Couldn't find stream's bind pair index"));
1162 }
1163 packed_streams[0] = index;
1164 } else {
1165 for packed_stream in packed_streams.iter_mut() {
1166 *packed_stream = read_variable_u64(header).await?;
1167 }
1168 }
1169 block.packed_streams = packed_streams;
1170
1171 Ok(block)
1172 }
1173}
1174
1175#[inline]
1176async fn read_variable_usize<R: AsyncRead + Unpin>(
1177 reader: &mut R,
1178 field: &str,
1179) -> Result<usize, Error> {
1180 let size = read_variable_u64(reader).await?;
1181 assert_usize(size, field)
1182}
1183
1184#[inline]
1185fn assert_usize(size: u64, field: &str) -> Result<usize, Error> {
1186 if size > usize::MAX as u64 {
1187 return Err(Error::other(format!("Cannot handle {field} {size}")));
1188 }
1189 Ok(size as usize)
1190}
1191
1192async fn read_variable_u64<R: AsyncRead + Unpin>(reader: &mut R) -> io::Result<u64> {
1193 let first = {
1194 let mut b = [0u8; 1];
1195 AsyncReadExt::read_exact(reader, &mut b).await?;
1196 b[0] as u64
1197 };
1198 let mut mask = 0x80_u64;
1199 let mut value = 0;
1200 for i in 0..8 {
1201 if (first & mask) == 0 {
1202 return Ok(value | ((first & (mask - 1)) << (8 * i)));
1203 }
1204 let b = {
1205 let mut bb = [0u8; 1];
1206 AsyncReadExt::read_exact(reader, &mut bb).await?;
1207 bb[0] as u64
1208 };
1209 value |= b << (8 * i);
1210 mask >>= 1;
1211 }
1212 Ok(value)
1213}
1214
1215async fn read_all_or_bits<R: AsyncRead + Unpin>(header: &mut R, size: usize) -> io::Result<BitSet> {
1216 let all = {
1217 let mut b = [0u8; 1];
1218 AsyncReadExt::read_exact(header, &mut b).await?;
1219 b[0]
1220 };
1221 if all != 0 {
1222 let mut bits = BitSet::with_capacity(size);
1223 for i in 0..size {
1224 bits.insert(i);
1225 }
1226 Ok(bits)
1227 } else {
1228 read_bits(header, size).await
1229 }
1230}
1231
1232async fn read_bits<R: AsyncRead + Unpin>(header: &mut R, size: usize) -> io::Result<BitSet> {
1233 let mut bits = BitSet::with_capacity(size);
1234 let mut mask = 0u32;
1235 let mut cache = 0u32;
1236 for i in 0..size {
1237 if mask == 0 {
1238 mask = 0x80;
1239 let mut b = [0u8; 1];
1240 AsyncReadExt::read_exact(header, &mut b).await?;
1241 cache = b[0] as u32;
1242 }
1243 if (cache & mask) != 0 {
1244 bits.insert(i);
1245 }
1246 mask >>= 1;
1247 }
1248 Ok(bits)
1249}
1250
1251#[derive(Copy, Clone)]
1252struct IndexEntry {
1253 block_index: Option<usize>,
1254 file_index: usize,
1255}
1256
1257pub struct ArchiveReader<R: AsyncRead + AsyncSeek + Unpin> {
1259 source: R,
1260 archive: Archive,
1261 password: Password,
1262 thread_count: u32,
1263 index: HashMap<String, IndexEntry>,
1264}
1265
1266#[cfg(not(target_arch = "wasm32"))]
1267impl ArchiveReader<Cursor<Vec<u8>>> {
1268 pub async fn open(path: impl AsRef<Path>, password: Password) -> Result<Self, Error> {
1270 let data = afs::read(path.as_ref())
1271 .await
1272 .map_err(|e| Error::file_open(e, path.as_ref().to_string_lossy().to_string()))?;
1273 let cursor = Cursor::new(data);
1274 Self::new(cursor, password).await
1275 }
1276
1277 pub async fn open_from_bytes(data: Vec<u8>, password: Password) -> Result<Self, Error> {
1279 let cursor = Cursor::new(data);
1280 Self::new(cursor, password).await
1281 }
1282}
1283
1284impl<R: AsyncRead + AsyncSeek + Unpin + Send> ArchiveReader<R> {
1285 #[inline]
1287 pub(crate) async fn new(mut source: R, password: Password) -> Result<Self, Error> {
1288 let archive = Archive::read(&mut source, &password).await?;
1289
1290 let mut reader = Self {
1291 source,
1292 archive,
1293 password,
1294 thread_count: 1,
1295 index: HashMap::default(),
1296 };
1297
1298 reader.fill_index();
1299
1300 let thread_count =
1301 std::thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
1302 reader.set_thread_count(thread_count.get() as u32);
1303
1304 Ok(reader)
1305 }
1306
1307 #[inline]
1317 pub fn from_archive(archive: Archive, source: R, password: Password) -> Self {
1318 let mut reader = Self {
1319 source,
1320 archive,
1321 password,
1322 thread_count: 1,
1323 index: HashMap::default(),
1324 };
1325
1326 reader.fill_index();
1327
1328 let thread_count =
1329 std::thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
1330 reader.set_thread_count(thread_count.get() as u32);
1331
1332 reader
1333 }
1334
1335 pub fn set_thread_count(&mut self, thread_count: u32) {
1340 self.thread_count = thread_count.clamp(1, 256);
1341 }
1342
1343 fn fill_index(&mut self) {
1344 for (file_index, file) in self.archive.files.iter().enumerate() {
1345 let block_index = self.archive.stream_map.file_block_index[file_index];
1346
1347 self.index.insert(
1348 file.name.clone(),
1349 IndexEntry {
1350 block_index,
1351 file_index,
1352 },
1353 );
1354 }
1355 }
1356
1357 #[inline]
1362 pub fn archive(&self) -> &Archive {
1363 &self.archive
1364 }
1365
1366 async fn build_decode_stack<'r>(
1367 source: &'r mut R,
1368 archive: &Archive,
1369 block_index: usize,
1370 password: &Password,
1371 thread_count: u32,
1372 ) -> Result<(Box<dyn AsyncRead + Unpin + Send + 'r>, usize), Error> {
1373 let block = &archive.blocks[block_index];
1374 if block.total_input_streams > block.total_output_streams {
1375 return Self::build_decode_stack2(source, archive, block_index, password, thread_count);
1376 }
1377 let first_pack_stream_index = archive.stream_map.block_first_pack_stream_index[block_index];
1378 let block_offset = SIGNATURE_HEADER_SIZE
1379 + archive.pack_pos
1380 + archive.stream_map.pack_stream_offsets[first_pack_stream_index];
1381
1382 let (mut has_crc, mut crc) = (block.has_crc, block.crc);
1383
1384 if !has_crc && block.num_unpack_sub_streams == 1 {
1386 if let Some(sub_streams_info) = archive.sub_streams_info.as_ref() {
1387 let mut substream_index = 0;
1388 for i in 0..block_index {
1389 substream_index += archive.blocks[i].num_unpack_sub_streams;
1390 }
1391
1392 if sub_streams_info.has_crc.contains(substream_index) {
1395 has_crc = true;
1396 crc = sub_streams_info.crcs[substream_index];
1397 }
1398 }
1399 }
1400
1401 AsyncSeekExt::seek(source, SeekFrom::Start(block_offset)).await?;
1402 let pack_size = archive.pack_sizes[first_pack_stream_index] as usize;
1403
1404 let mut decoder: Box<dyn AsyncRead + Unpin + Send> =
1405 Box::new(BoundedReader::new(source, pack_size));
1406 let block = &archive.blocks[block_index];
1407 for (index, coder) in block.ordered_coder_iter() {
1408 if coder.num_in_streams != 1 || coder.num_out_streams != 1 {
1409 return Err(Error::unsupported(
1410 "Multi input/output stream coders are not supported",
1411 ));
1412 }
1413 let next = add_decoder(
1414 decoder,
1415 block.get_unpack_size_at_index(index) as usize,
1416 coder,
1417 password,
1418 MAX_MEM_LIMIT_KB,
1419 thread_count,
1420 )
1421 .await?;
1422 decoder = Box::new(next);
1423 }
1424 if has_crc {
1425 decoder = Box::new(Crc32VerifyingReader::new(
1426 decoder,
1427 block.get_unpack_size() as usize,
1428 crc,
1429 ));
1430 }
1431
1432 Ok((decoder, pack_size))
1433 }
1434
1435 fn build_decode_stack2<'r>(
1436 source: &'r mut R,
1437 archive: &Archive,
1438 block_index: usize,
1439 password: &Password,
1440 thread_count: u32,
1441 ) -> Result<(Box<dyn AsyncRead + Unpin + Send + 'r>, usize), Error> {
1442 const MAX_CODER_COUNT: usize = 32;
1443 let block = &archive.blocks[block_index];
1444 if block.coders.len() > MAX_CODER_COUNT {
1445 return Err(Error::unsupported(format!(
1446 "Too many coders: {}",
1447 block.coders.len()
1448 )));
1449 }
1450
1451 assert!(block.total_input_streams > block.total_output_streams);
1452 let shared_source = Arc::new(Mutex::new(source));
1453 let first_pack_stream_index = archive.stream_map.block_first_pack_stream_index[block_index];
1454 let start_pos = SIGNATURE_HEADER_SIZE + archive.pack_pos;
1455 let offsets = &archive.stream_map.pack_stream_offsets[first_pack_stream_index..];
1456
1457 let mut sources = Vec::with_capacity(block.packed_streams.len());
1458
1459 for (i, offset) in offsets[..block.packed_streams.len()].iter().enumerate() {
1460 let pack_pos = start_pos + offset;
1461 let pack_size = archive.pack_sizes[first_pack_stream_index + i];
1462
1463 let pack_reader = SharedBoundedReader::new(
1464 Arc::clone(&shared_source),
1465 (pack_pos, pack_pos + pack_size),
1466 );
1467
1468 sources.push(pack_reader);
1469 }
1470
1471 let mut coder_to_stream_map = [usize::MAX; MAX_CODER_COUNT];
1472
1473 let mut si = 0;
1474 for (i, coder) in block.coders.iter().enumerate() {
1475 coder_to_stream_map[i] = si;
1476 si += coder.num_in_streams as usize;
1477 }
1478
1479 let main_coder_index = {
1480 let mut coder_used = [false; MAX_CODER_COUNT];
1481 for bp in block.bind_pairs.iter() {
1482 coder_used[bp.out_index as usize] = true;
1483 }
1484 let mut mci = 0;
1485 for (i, used) in coder_used[..block.coders.len()].iter().enumerate() {
1486 if !used {
1487 mci = i;
1488 break;
1489 }
1490 }
1491 mci
1492 };
1493
1494 let id = block.coders[main_coder_index].encoder_method_id();
1495 if id != EncoderMethod::ID_BCJ2 {
1496 return Err(Error::unsupported(format!("Unsupported method: {id:?}")));
1497 }
1498
1499 let num_in_streams = block.coders[main_coder_index].num_in_streams as usize;
1500 let mut inputs: Vec<Box<dyn AsyncRead + Unpin + Send>> = Vec::with_capacity(num_in_streams);
1501 let start_i = coder_to_stream_map[main_coder_index];
1502 for i in start_i..num_in_streams + start_i {
1503 inputs.push(Self::get_in_stream(
1504 block,
1505 &sources,
1506 &coder_to_stream_map,
1507 password,
1508 i,
1509 thread_count,
1510 )?);
1511 }
1512 let inputs_std = inputs
1513 .into_iter()
1514 .map(crate::util::decompress::AsyncReadSeekAsStd::new)
1515 .collect::<Vec<_>>();
1516 let mut decoder: Box<dyn AsyncRead + Unpin + Send> = Box::new(AsyncStdRead::new(
1517 Bcj2Reader::new(inputs_std, block.get_unpack_size()),
1518 ));
1519 if block.has_crc {
1520 decoder = Box::new(Crc32VerifyingReader::new(
1521 decoder,
1522 block.get_unpack_size() as usize,
1523 block.crc,
1524 ));
1525 }
1526 Ok((
1527 decoder,
1528 archive.pack_sizes[first_pack_stream_index] as usize,
1529 ))
1530 }
1531
1532 fn get_in_stream<'r>(
1533 block: &Block,
1534 sources: &[SharedBoundedReader<'r, R>],
1535 coder_to_stream_map: &[usize],
1536 password: &Password,
1537 in_stream_index: usize,
1538 thread_count: u32,
1539 ) -> Result<Box<dyn AsyncRead + Unpin + Send + 'r>, Error>
1540 where
1541 R: 'r,
1542 {
1543 let index = block
1544 .packed_streams
1545 .iter()
1546 .position(|&i| i == in_stream_index as u64);
1547 if let Some(index) = index {
1548 return Ok(Box::new(sources[index].clone()));
1549 }
1550
1551 let bp = block
1552 .find_bind_pair_for_in_stream(in_stream_index as u64)
1553 .ok_or_else(|| {
1554 Error::other(format!(
1555 "Couldn't find bind pair for stream {in_stream_index}"
1556 ))
1557 })?;
1558 let index = bp.out_index as usize;
1559
1560 Self::get_in_stream2(
1561 block,
1562 sources,
1563 coder_to_stream_map,
1564 password,
1565 index,
1566 thread_count,
1567 )
1568 }
1569
1570 fn get_in_stream2<'r>(
1571 block: &Block,
1572 sources: &[SharedBoundedReader<'r, R>],
1573 coder_to_stream_map: &[usize],
1574 password: &Password,
1575 in_stream_index: usize,
1576 thread_count: u32,
1577 ) -> Result<Box<dyn AsyncRead + Unpin + Send + 'r>, Error>
1578 where
1579 R: 'r,
1580 {
1581 let coder = &block.coders[in_stream_index];
1582 let start_index = coder_to_stream_map[in_stream_index];
1583 if start_index == usize::MAX {
1584 return Err(Error::other("in_stream_index out of range"));
1585 }
1586 let uncompressed_len = block.unpack_sizes[in_stream_index] as usize;
1587 if coder.num_in_streams == 1 {
1588 let input = Self::get_in_stream(
1589 block,
1590 sources,
1591 coder_to_stream_map,
1592 password,
1593 start_index,
1594 thread_count,
1595 )?;
1596
1597 let decoder = async_io::block_on(add_decoder(
1598 input,
1599 uncompressed_len,
1600 coder,
1601 password,
1602 MAX_MEM_LIMIT_KB,
1603 thread_count,
1604 ))?;
1605 return Ok(Box::new(decoder));
1606 }
1607 Err(Error::unsupported(
1608 "Multi input stream coders are not yet supported",
1609 ))
1610 }
1611
1612 pub(crate) async fn for_each_entries<
1613 F: for<'a> FnMut(
1614 &'a ArchiveEntry,
1615 &'a mut (dyn AsyncRead + Unpin + Send + 'a),
1616 ) -> Pin<Box<dyn Future<Output = Result<bool, Error>> + Send + 'a>>,
1617 >(
1618 &mut self,
1619 mut each: F,
1620 ) -> Result<(), Error> {
1621 let block_count = self.archive.blocks.len();
1622 for block_index in 0..block_count {
1623 let forder_dec = BlockDecoder::new(
1624 self.thread_count,
1625 block_index,
1626 &self.archive,
1627 &self.password,
1628 &mut self.source,
1629 );
1630 if !forder_dec
1631 .for_each_entries(&mut each)
1632 .await
1633 .map_err(|e| e.maybe_bad_password(!self.password.is_empty()))?
1634 {
1635 return Ok(());
1636 }
1637 }
1638 for file_index in 0..self.archive.files.len() {
1640 let block_index = self.archive.stream_map.file_block_index[file_index];
1641 if block_index.is_none() {
1642 let file = &self.archive.files[file_index];
1643 let mut empty_reader = AsyncStdRead::new([0u8; 0].as_slice());
1644 if !each(file, &mut empty_reader).await? {
1645 return Ok(());
1646 }
1647 }
1648 }
1649 Ok(())
1650 }
1651
1652 pub async fn read_file(&mut self, name: &str) -> Result<Vec<u8>, Error> {
1658 let index_entry = *self.index.get(name).ok_or(Error::FileNotFound)?;
1659 let file = &self.archive.files[index_entry.file_index];
1660
1661 if !file.has_stream {
1662 return Ok(Vec::new());
1663 }
1664
1665 let block_index = index_entry
1666 .block_index
1667 .ok_or_else(|| Error::other("File has no associated block"))?;
1668
1669 match self.archive.is_solid {
1670 true => {
1671 use std::sync::{Arc, Mutex};
1672 let result_cell = Arc::new(Mutex::new(None));
1673 let target_name = name.to_string();
1674
1675 BlockDecoder::new(
1676 self.thread_count,
1677 block_index,
1678 &self.archive,
1679 &self.password,
1680 &mut self.source,
1681 )
1682 .for_each_entries(&mut |archive_entry, reader| {
1683 let result_cell = Arc::clone(&result_cell);
1684 let target_name = target_name.clone();
1685 Box::pin(async move {
1686 let mut data = Vec::with_capacity(archive_entry.size as usize);
1687 AsyncReadExt::read_to_end(reader, &mut data).await?;
1688 if archive_entry.name == target_name {
1689 *result_cell.lock().unwrap() = Some(data);
1690 Ok(false)
1691 } else {
1692 Ok(true)
1693 }
1694 })
1695 })
1696 .await?;
1697
1698 let mut guard = result_cell.lock().unwrap();
1699 guard.take().ok_or(Error::FileNotFound)
1700 }
1701 false => {
1702 let pack_index = self.archive.stream_map.block_first_pack_stream_index[block_index];
1703 let pack_offset = self.archive.stream_map.pack_stream_offsets[pack_index];
1704 let block_offset = SIGNATURE_HEADER_SIZE + self.archive.pack_pos + pack_offset;
1705
1706 AsyncSeekExt::seek(&mut self.source, SeekFrom::Start(block_offset)).await?;
1707
1708 let (mut block_reader, _size) = Self::build_decode_stack(
1709 &mut self.source,
1710 &self.archive,
1711 block_index,
1712 &self.password,
1713 self.thread_count,
1714 )
1715 .await?;
1716
1717 let mut data = Vec::with_capacity(file.size as usize);
1718 let mut decoder: Box<dyn AsyncRead + Unpin + Send> =
1719 Box::new(BoundedReader::new(&mut block_reader, file.size as usize));
1720
1721 if file.has_crc {
1722 decoder = Box::new(Crc32VerifyingReader::new(
1723 decoder,
1724 file.size as usize,
1725 file.crc,
1726 ));
1727 }
1728
1729 AsyncReadExt::read_to_end(&mut decoder, &mut data).await?;
1730
1731 Ok(data)
1732 }
1733 }
1734 }
1735
1736 pub fn file_compression_methods(
1738 &self,
1739 file_name: &str,
1740 methods: &mut Vec<EncoderMethod>,
1741 ) -> Result<(), Error> {
1742 let index_entry = self.index.get(file_name).ok_or(Error::FileNotFound)?;
1743 let file = &self.archive.files[index_entry.file_index];
1744
1745 if !file.has_stream {
1746 return Ok(());
1747 }
1748
1749 let block_index = index_entry
1750 .block_index
1751 .ok_or_else(|| Error::other("File has no associated block"))?;
1752
1753 let block = self
1754 .archive
1755 .blocks
1756 .get(block_index)
1757 .ok_or_else(|| Error::other("Block not found"))?;
1758
1759 block
1760 .coders
1761 .iter()
1762 .filter_map(|coder| EncoderMethod::by_id(coder.encoder_method_id()))
1763 .for_each(|method| {
1764 methods.push(method);
1765 });
1766
1767 Ok(())
1768 }
1769}
1770
1771pub struct BlockDecoder<'a, R: AsyncRead + AsyncSeek + Unpin> {
1776 thread_count: u32,
1777 block_index: usize,
1778 archive: &'a Archive,
1779 password: &'a Password,
1780 source: &'a mut R,
1781}
1782
1783impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send> BlockDecoder<'a, R> {
1784 pub fn new(
1794 thread_count: u32,
1795 block_index: usize,
1796 archive: &'a Archive,
1797 password: &'a Password,
1798 source: &'a mut R,
1799 ) -> Self {
1800 Self {
1801 thread_count,
1802 block_index,
1803 archive,
1804 password,
1805 source,
1806 }
1807 }
1808
1809 pub fn set_thread_count(&mut self, thread_count: u32) {
1812 self.thread_count = thread_count.clamp(1, 256);
1813 }
1814
1815 pub fn entries(&self) -> &[ArchiveEntry] {
1819 let start = self.archive.stream_map.block_first_file_index[self.block_index];
1820 let file_count = self.archive.blocks[self.block_index].num_unpack_sub_streams;
1821 &self.archive.files[start..(file_count + start)]
1822 }
1823
1824 pub fn entry_count(&self) -> usize {
1826 self.archive.blocks[self.block_index].num_unpack_sub_streams
1827 }
1828
1829 pub async fn for_each_entries<
1836 F: for<'b> FnMut(
1837 &'b ArchiveEntry,
1838 &'b mut (dyn AsyncRead + Unpin + Send + 'b),
1839 ) -> Pin<Box<dyn Future<Output = Result<bool, Error>> + Send + 'b>>,
1840 >(
1841 self,
1842 each: &mut F,
1843 ) -> Result<bool, Error> {
1844 let Self {
1845 thread_count,
1846 block_index,
1847 archive,
1848 password,
1849 source,
1850 } = self;
1851 let (mut block_reader, _size) =
1852 ArchiveReader::build_decode_stack(source, archive, block_index, password, thread_count)
1853 .await?;
1854 let start = archive.stream_map.block_first_file_index[block_index];
1855 let file_count = archive.blocks[block_index].num_unpack_sub_streams;
1856
1857 for file_index in start..(file_count + start) {
1858 let file = &archive.files[file_index];
1859 if file.has_stream && file.size > 0 {
1860 let mut decoder: Box<dyn AsyncRead + Unpin + Send> =
1861 Box::new(BoundedReader::new(&mut block_reader, file.size as usize));
1862 if file.has_crc {
1863 decoder = Box::new(Crc32VerifyingReader::new(
1864 decoder,
1865 file.size as usize,
1866 file.crc,
1867 ));
1868 }
1869 {
1870 let cont = each(file, &mut decoder)
1871 .await
1872 .map_err(|e| e.maybe_bad_password(!password.is_empty()))?;
1873 if !cont {
1874 return Ok(false);
1875 }
1876 }
1877 } else {
1878 let mut empty_reader = AsyncStdRead::new([0u8; 0].as_slice());
1879 if !each(file, &mut empty_reader).await? {
1880 return Ok(false);
1881 }
1882 }
1883 }
1884 Ok(true)
1885 }
1886}