1use std::{
2 cell::RefCell, collections::HashMap, future::Future, io, num::NonZeroUsize, pin::Pin, rc::Rc,
3};
4
5use futures::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, SeekFrom};
6
7use async_fs as afs;
8use crc32fast::Hasher;
9use lzma_rust2::filter::bcj2::Bcj2Reader;
10
11use crate::{
12 Password,
13 archive::*,
14 bitset::BitSet,
15 block::*,
16 decoder::{AsyncStdRead, add_decoder},
17 error::Error,
18};
19
20const MAX_MEM_LIMIT_KB: usize = usize::MAX / 1024;
21
22pub struct BoundedReader<R: AsyncRead + Unpin> {
23 inner: R,
24 remain: usize,
25}
26
27impl<R: AsyncRead + Unpin> BoundedReader<R> {
28 pub fn new(inner: R, max_size: usize) -> Self {
29 Self {
30 inner,
31 remain: max_size,
32 }
33 }
34}
35
36impl<R: AsyncRead + Unpin> futures::io::AsyncRead for BoundedReader<R> {
37 fn poll_read(
38 mut self: std::pin::Pin<&mut Self>,
39 cx: &mut std::task::Context<'_>,
40 buf: &mut [u8],
41 ) -> std::task::Poll<std::io::Result<usize>> {
42 if self.remain == 0 {
43 return std::task::Poll::Ready(Ok(0));
44 }
45 let bound = buf.len().min(self.remain);
46 let poll = std::pin::Pin::new(&mut self.inner).poll_read(cx, &mut buf[..bound]);
47 if let std::task::Poll::Ready(Ok(size)) = &poll {
48 self.remain -= *size;
49 }
50 poll
51 }
52}
53
54#[derive(Debug)]
57pub(crate) struct SharedBoundedReader<'a, R> {
58 inner: Rc<RefCell<&'a mut R>>,
59 cur: u64,
60 bounds: (u64, u64),
61}
62
63impl<'a, R> Clone for SharedBoundedReader<'a, R> {
64 fn clone(&self) -> Self {
65 Self {
66 inner: Rc::clone(&self.inner),
67 cur: self.cur,
68 bounds: self.bounds,
69 }
70 }
71}
72
73impl<'a, R: AsyncRead + AsyncSeek + Unpin> futures::io::AsyncRead for SharedBoundedReader<'a, R> {
74 fn poll_read(
75 mut self: std::pin::Pin<&mut Self>,
76 cx: &mut std::task::Context<'_>,
77 buf: &mut [u8],
78 ) -> std::task::Poll<std::io::Result<usize>> {
79 if self.cur >= self.bounds.1 {
80 return std::task::Poll::Ready(Ok(0));
81 }
82 let cur = self.cur;
83 let mut inner = self.inner.borrow_mut();
84 match std::pin::Pin::new(&mut *inner).poll_seek(cx, SeekFrom::Start(cur)) {
85 std::task::Poll::Pending => return std::task::Poll::Pending,
86 std::task::Poll::Ready(Ok(_)) => {}
87 std::task::Poll::Ready(Err(e)) => return std::task::Poll::Ready(Err(e)),
88 }
89 let bound = buf.len().min((self.bounds.1 - cur) as usize);
90 let poll = std::pin::Pin::new(&mut *inner).poll_read(cx, &mut buf[..bound]);
91 drop(inner);
92 match poll {
93 std::task::Poll::Pending => std::task::Poll::Pending,
94 std::task::Poll::Ready(Ok(size)) => {
95 self.cur += size as u64;
96 std::task::Poll::Ready(Ok(size))
97 }
98 std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)),
99 }
100 }
101}
102
103impl<'a, R: AsyncRead + AsyncSeek + Unpin> SharedBoundedReader<'a, R> {
104 fn new(inner: Rc<RefCell<&'a mut R>>, bounds: (u64, u64)) -> Self {
105 Self {
106 inner,
107 cur: bounds.0,
108 bounds,
109 }
110 }
111}
112
113struct Crc32VerifyingReader<R> {
114 inner: R,
115 crc_digest: Hasher,
116 expected_value: u64,
117 remaining: i64,
118}
119
120impl<R> Crc32VerifyingReader<R> {
121 fn new(inner: R, remaining: usize, expected_value: u64) -> Self {
122 Self {
123 inner,
124 crc_digest: Hasher::new(),
125 expected_value,
126 remaining: remaining as i64,
127 }
128 }
129}
130
131impl<R: futures::io::AsyncRead + Unpin> futures::io::AsyncRead for Crc32VerifyingReader<R> {
134 fn poll_read(
135 mut self: std::pin::Pin<&mut Self>,
136 cx: &mut std::task::Context<'_>,
137 buf: &mut [u8],
138 ) -> std::task::Poll<std::io::Result<usize>> {
139 if self.remaining <= 0 {
140 return std::task::Poll::Ready(Ok(0));
141 }
142 let poll = std::pin::Pin::new(&mut self.inner).poll_read(cx, buf);
143 if let std::task::Poll::Ready(Ok(size)) = poll {
144 if size > 0 {
145 self.remaining -= size as i64;
146 self.crc_digest.update(&buf[..size]);
147 }
148 if self.remaining <= 0 {
149 let d = std::mem::replace(&mut self.crc_digest, Hasher::new()).finalize();
150 if d as u64 != self.expected_value {
151 return std::task::Poll::Ready(Err(std::io::Error::other(
152 Error::ChecksumVerificationFailed,
153 )));
154 }
155 }
156 }
157 poll
158 }
159}
160
161impl Archive {
162 #[cfg(not(target_arch = "wasm32"))]
166 pub async fn open(path: impl AsRef<std::path::Path>) -> Result<Archive, Error> {
167 let data = afs::read(path.as_ref())
168 .await
169 .map_err(|e| Error::file_open(e, path.as_ref().to_string_lossy().to_string()))?;
170 let mut cursor = futures::io::Cursor::new(data);
171 Self::read(&mut cursor, &Password::empty()).await
172 }
173
174 #[cfg(not(target_arch = "wasm32"))]
178 pub async fn open_with_password(
179 path: impl AsRef<std::path::Path>,
180 password: &Password,
181 ) -> Result<Archive, Error> {
182 let data = afs::read(path.as_ref())
183 .await
184 .map_err(|e| Error::file_open(e, path.as_ref().to_string_lossy().to_string()))?;
185 let mut cursor = futures::io::Cursor::new(data);
186 Self::read(&mut cursor, password).await
187 }
188
189 pub(crate) async fn read<R: AsyncRead + AsyncSeek + Unpin>(
208 reader: &mut R,
209 password: &Password,
210 ) -> Result<Archive, Error> {
211 let reader_len = AsyncSeekExt::seek(reader, SeekFrom::End(0)).await?;
212 AsyncSeekExt::seek(reader, SeekFrom::Start(0)).await?;
213
214 let mut signature = [0; 6];
215 AsyncReadExt::read_exact(reader, &mut signature).await?;
216 if signature != SEVEN_Z_SIGNATURE {
217 return Err(Error::BadSignature(signature));
218 }
219 let mut versions = [0; 2];
220 AsyncReadExt::read_exact(reader, &mut versions).await?;
221 let version_major = versions[0];
222 let version_minor = versions[1];
223 if version_major != 0 {
224 return Err(Error::UnsupportedVersion {
225 major: version_major,
226 minor: version_minor,
227 });
228 }
229
230 let start_header_crc = {
231 let mut buf = [0u8; 4];
232 AsyncReadExt::read_exact(reader, &mut buf).await?;
233 u32::from_le_bytes(buf)
234 };
235
236 let header_valid = if start_header_crc == 0 {
237 let current_position = AsyncSeekExt::stream_position(reader).await?;
238 let mut buf = [0; 20];
239 AsyncReadExt::read_exact(reader, &mut buf).await?;
240 AsyncSeekExt::seek(reader, SeekFrom::Start(current_position)).await?;
241 buf.iter().any(|a| *a != 0)
242 } else {
243 true
244 };
245 if header_valid {
246 let start_header = Self::read_start_header(reader, start_header_crc).await?;
247 Self::init_archive(reader, start_header, password, true, 1).await
248 } else {
249 Self::try_to_locale_end_header(reader, reader_len, password, 1).await
250 }
251 }
252
253 async fn read_start_header<R: AsyncRead + Unpin>(
254 reader: &mut R,
255 start_header_crc: u32,
256 ) -> Result<StartHeader, Error> {
257 let mut buf = [0; 20];
258 AsyncReadExt::read_exact(reader, &mut buf).await?;
259 let crc32 = crc32fast::hash(&buf);
260 if crc32 != start_header_crc {
261 return Err(Error::ChecksumVerificationFailed);
262 }
263 let offset = u64::from_le_bytes(buf[0..8].try_into().unwrap());
264 let size = u64::from_le_bytes(buf[8..16].try_into().unwrap());
265 let crc = u32::from_le_bytes(buf[16..20].try_into().unwrap());
266 Ok(StartHeader {
267 next_header_offset: offset,
268 next_header_size: size,
269 next_header_crc: crc as u64,
270 })
271 }
272
273 async fn read_header<R: AsyncRead + AsyncSeek + Unpin>(
274 header: &mut R,
275 archive: &mut Archive,
276 ) -> Result<(), Error> {
277 let mut nid = {
278 let mut b = [0u8; 1];
279 AsyncReadExt::read_exact(header, &mut b).await?;
280 b[0]
281 };
282 if nid == K_ARCHIVE_PROPERTIES {
283 Self::read_archive_properties(header).await?;
284 nid = {
285 let mut b = [0u8; 1];
286 AsyncReadExt::read_exact(header, &mut b).await?;
287 b[0]
288 };
289 }
290
291 if nid == K_ADDITIONAL_STREAMS_INFO {
292 return Err(Error::other("Additional streams unsupported"));
293 }
294 if nid == K_MAIN_STREAMS_INFO {
295 Self::read_streams_info(header, archive).await?;
296 nid = {
297 let mut b = [0u8; 1];
298 AsyncReadExt::read_exact(header, &mut b).await?;
299 b[0]
300 };
301 }
302 if nid == K_FILES_INFO {
303 Self::read_files_info(header, archive).await?;
304 nid = {
305 let mut b = [0u8; 1];
306 AsyncReadExt::read_exact(header, &mut b).await?;
307 b[0]
308 };
309 }
310 if nid != K_END {
311 return Err(Error::BadTerminatedHeader(nid));
312 }
313
314 Ok(())
315 }
316
317 async fn read_archive_properties<R: AsyncRead + AsyncSeek + Unpin>(
318 header: &mut R,
319 ) -> Result<(), Error> {
320 let mut nid = {
321 let mut b = [0u8; 1];
322 AsyncReadExt::read_exact(header, &mut b).await?;
323 b[0]
324 };
325 while nid != K_END {
326 let property_size = read_variable_usize(header, "propertySize").await?;
327 AsyncSeekExt::seek(header, SeekFrom::Current(property_size as i64)).await?;
328 nid = {
329 let mut b = [0u8; 1];
330 AsyncReadExt::read_exact(header, &mut b).await?;
331 b[0]
332 };
333 }
334 Ok(())
335 }
336
337 async fn try_to_locale_end_header<R: AsyncRead + AsyncSeek + Unpin>(
338 reader: &mut R,
339 reader_len: u64,
340 password: &Password,
341 thread_count: u32,
342 ) -> Result<Self, Error> {
343 let search_limit = 1024 * 1024;
344 let prev_data_size = AsyncSeekExt::stream_position(reader).await? + 20;
345 let size = reader_len;
346 let cur_pos = AsyncSeekExt::stream_position(reader).await?;
347 let min_pos = if cur_pos + search_limit > size {
348 cur_pos
349 } else {
350 size - search_limit
351 };
352 let mut pos = reader_len - 1;
353 while pos > min_pos {
354 pos -= 1;
355
356 AsyncSeekExt::seek(reader, SeekFrom::Start(pos)).await?;
357 let nid = {
358 let mut buf = [0u8; 1];
359 AsyncReadExt::read_exact(reader, &mut buf).await?;
360 buf[0]
361 };
362 if nid == K_ENCODED_HEADER || nid == K_HEADER {
363 let start_header = StartHeader {
364 next_header_offset: pos - prev_data_size,
365 next_header_size: reader_len - pos,
366 next_header_crc: 0,
367 };
368 let result =
369 Self::init_archive(reader, start_header, password, false, thread_count).await?;
370
371 if !result.files.is_empty() {
372 return Ok(result);
373 }
374 }
375 }
376 Err(Error::other(
377 "Start header corrupt and unable to guess end header",
378 ))
379 }
380
381 async fn init_archive<R: AsyncRead + AsyncSeek + Unpin>(
382 reader: &mut R,
383 start_header: StartHeader,
384 password: &Password,
385 verify_crc: bool,
386 thread_count: u32,
387 ) -> Result<Self, Error> {
388 if start_header.next_header_size > usize::MAX as u64 {
389 return Err(Error::other(format!(
390 "Cannot handle next_header_size {}",
391 start_header.next_header_size
392 )));
393 }
394
395 let next_header_size_int = start_header.next_header_size as usize;
396
397 AsyncSeekExt::seek(
398 reader,
399 SeekFrom::Start(SIGNATURE_HEADER_SIZE + start_header.next_header_offset),
400 )
401 .await?;
402
403 let mut buf = vec![0; next_header_size_int];
404 AsyncReadExt::read_exact(reader, &mut buf).await?;
405 if verify_crc && crc32fast::hash(&buf) as u64 != start_header.next_header_crc {
406 return Err(Error::NextHeaderCrcMismatch);
407 }
408
409 let mut archive = Archive::default();
410 let nid = buf.first().copied().unwrap_or(0);
411 if nid == K_ENCODED_HEADER {
412 let mut cursor = futures::io::Cursor::new(&buf[1..]);
413 let (mut out_reader, buf_size) = Self::read_encoded_header(
414 &mut cursor,
415 reader,
416 &mut archive,
417 password,
418 thread_count,
419 )
420 .await?;
421 buf.clear();
422 buf.resize(buf_size, 0);
423 AsyncReadExt::read_exact(&mut out_reader, &mut buf)
424 .await
425 .map_err(|e| Error::bad_password(e, !password.is_empty()))?;
426 archive = Archive::default();
427 }
428 let nid = buf.first().copied().unwrap_or(0);
429 if nid == K_HEADER {
430 let mut header = futures::io::Cursor::new(&buf[1..]);
431 Self::read_header(&mut header, &mut archive).await?;
432 } else {
433 return Err(Error::other("Broken or unsupported archive: no Header"));
434 }
435
436 archive.is_solid = archive
437 .blocks
438 .iter()
439 .any(|block| block.num_unpack_sub_streams > 1);
440
441 Ok(archive)
442 }
443
444 async fn read_encoded_header<'r, RI: 'r + AsyncRead + AsyncSeek + Unpin>(
445 header: &mut (impl AsyncRead + Unpin),
446 reader: &'r mut RI,
447 archive: &mut Archive,
448 password: &Password,
449 thread_count: u32,
450 ) -> Result<(Box<dyn futures::io::AsyncRead + Unpin + 'r>, usize), Error> {
451 Self::read_streams_info(header, archive).await?;
452 let block = archive
453 .blocks
454 .first()
455 .ok_or(Error::other("no blocks, can't read encoded header"))?;
456 let first_pack_stream_index = 0;
457 let block_offset = SIGNATURE_HEADER_SIZE + archive.pack_pos;
458 if archive.pack_sizes.is_empty() {
459 return Err(Error::other("no packed streams, can't read encoded header"));
460 }
461
462 AsyncSeekExt::seek(reader, SeekFrom::Start(block_offset)).await?;
463 let coder_len = block.coders.len();
464 let unpack_size = block.get_unpack_size() as usize;
465 let pack_size = archive.pack_sizes[first_pack_stream_index] as usize;
466 let mut decoder: Box<dyn futures::io::AsyncRead + Unpin> =
467 Box::new(BoundedReader::new(reader, pack_size));
468 let mut decoder = if coder_len > 0 {
469 for (index, coder) in block.ordered_coder_iter() {
470 if coder.num_in_streams != 1 || coder.num_out_streams != 1 {
471 return Err(Error::other(
472 "Multi input/output stream coders are not yet supported",
473 ));
474 }
475 let next = add_decoder(
476 decoder,
477 block.get_unpack_size_at_index(index) as usize,
478 coder,
479 password,
480 MAX_MEM_LIMIT_KB,
481 thread_count,
482 )
483 .await?;
484 decoder = Box::new(next);
485 }
486 decoder
487 } else {
488 decoder
489 };
490 if block.has_crc {
491 decoder = Box::new(Crc32VerifyingReader::new(decoder, unpack_size, block.crc));
492 }
493
494 Ok((decoder, unpack_size))
495 }
496
497 async fn read_streams_info<R: AsyncRead + Unpin>(
498 header: &mut R,
499 archive: &mut Archive,
500 ) -> Result<(), Error> {
501 let mut nid = {
502 let mut b = [0u8; 1];
503 AsyncReadExt::read_exact(header, &mut b).await?;
504 b[0]
505 };
506 if nid == K_PACK_INFO {
507 Self::read_pack_info(header, archive).await?;
508 nid = {
509 let mut b = [0u8; 1];
510 AsyncReadExt::read_exact(header, &mut b).await?;
511 b[0]
512 };
513 }
514
515 if nid == K_UNPACK_INFO {
516 Self::read_unpack_info(header, archive).await?;
517 nid = {
518 let mut b = [0u8; 1];
519 AsyncReadExt::read_exact(header, &mut b).await?;
520 b[0]
521 };
522 } else {
523 archive.blocks.clear();
524 }
525 if nid == K_SUB_STREAMS_INFO {
526 Self::read_sub_streams_info(header, archive).await?;
527 nid = {
528 let mut b = [0u8; 1];
529 AsyncReadExt::read_exact(header, &mut b).await?;
530 b[0]
531 };
532 }
533 if nid != K_END {
534 return Err(Error::BadTerminatedStreamsInfo(nid));
535 }
536
537 Ok(())
538 }
539
540 async fn read_files_info<R: AsyncRead + AsyncSeek + Unpin>(
541 header: &mut R,
542 archive: &mut Archive,
543 ) -> Result<(), Error> {
544 let num_files = read_variable_usize(header, "num files").await?;
545 let mut files: Vec<ArchiveEntry> = vec![Default::default(); num_files];
546
547 let mut is_empty_stream: Option<BitSet> = None;
548 let mut is_empty_file: Option<BitSet> = None;
549 let mut is_anti: Option<BitSet> = None;
550 loop {
551 let prop_type = {
552 let mut b = [0u8; 1];
553 AsyncReadExt::read_exact(header, &mut b).await?;
554 b[0]
555 };
556 if prop_type == 0 {
557 break;
558 }
559 let size = read_variable_u64(header).await?;
560 match prop_type {
561 K_EMPTY_STREAM => {
562 is_empty_stream = Some(read_bits(header, num_files).await?);
563 }
564 K_EMPTY_FILE => {
565 let n = if let Some(s) = &is_empty_stream {
566 s.len()
567 } else {
568 return Err(Error::other(
569 "Header format error: kEmptyStream must appear before kEmptyFile",
570 ));
571 };
572 is_empty_file = Some(read_bits(header, n).await?);
573 }
574 K_ANTI => {
575 let n = if let Some(s) = is_empty_stream.as_ref() {
576 s.len()
577 } else {
578 return Err(Error::other(
579 "Header format error: kEmptyStream must appear before kEmptyFile",
580 ));
581 };
582 is_anti = Some(read_bits(header, n).await?);
583 }
584 K_NAME => {
585 let external = {
586 let mut b = [0u8; 1];
587 AsyncReadExt::read_exact(header, &mut b).await?;
588 b[0]
589 };
590 if external != 0 {
591 return Err(Error::other("Not implemented:external != 0"));
592 }
593 if (size - 1) & 1 != 0 {
594 return Err(Error::other("file names length invalid"));
595 }
596
597 let size = assert_usize(size, "file names length")?;
598 let mut next_file = 0;
599 let mut read_bytes = 0usize;
600 let mut cache: Vec<u16> = Vec::with_capacity(16);
601 let mut buf2 = [0u8; 2];
602 while read_bytes < size - 1 {
603 AsyncReadExt::read_exact(header, &mut buf2).await?;
604 read_bytes += 2;
605 let u = u16::from_le_bytes(buf2);
606 if u == 0 {
607 let s = String::from_utf16(&cache)
608 .map_err(|e| Error::other(e.to_string()))?;
609 files[next_file].name = s;
610 next_file += 1;
611 cache.clear();
612 } else {
613 cache.push(u);
614 }
615 }
616 if next_file != files.len() {
617 return Err(Error::other("Error parsing file names"));
618 }
619 }
620 K_C_TIME => {
621 let times_defined = read_all_or_bits(header, num_files).await?;
622 let external = {
623 let mut b = [0u8; 1];
624 AsyncReadExt::read_exact(header, &mut b).await?;
625 b[0]
626 };
627 if external != 0 {
628 return Err(Error::other(format!(
629 "kCTime Unimplemented:external={external}"
630 )));
631 }
632 for (i, file) in files.iter_mut().enumerate() {
633 file.has_creation_date = times_defined.contains(i);
634 if file.has_creation_date {
635 let mut b8 = [0u8; 8];
636 AsyncReadExt::read_exact(header, &mut b8).await?;
637 file.creation_date = u64::from_le_bytes(b8).into();
638 }
639 }
640 }
641 K_A_TIME => {
642 let times_defined = read_all_or_bits(header, num_files).await?;
643 let external = {
644 let mut b = [0u8; 1];
645 AsyncReadExt::read_exact(header, &mut b).await?;
646 b[0]
647 };
648 if external != 0 {
649 return Err(Error::other(format!(
650 "kATime Unimplemented:external={external}"
651 )));
652 }
653 for (i, file) in files.iter_mut().enumerate() {
654 file.has_access_date = times_defined.contains(i);
655 if file.has_access_date {
656 let mut b8 = [0u8; 8];
657 AsyncReadExt::read_exact(header, &mut b8).await?;
658 file.access_date = u64::from_le_bytes(b8).into();
659 }
660 }
661 }
662 K_M_TIME => {
663 let times_defined = read_all_or_bits(header, num_files).await?;
664 let external = {
665 let mut b = [0u8; 1];
666 AsyncReadExt::read_exact(header, &mut b).await?;
667 b[0]
668 };
669 if external != 0 {
670 return Err(Error::other(format!(
671 "kMTime Unimplemented:external={external}"
672 )));
673 }
674 for (i, file) in files.iter_mut().enumerate() {
675 file.has_last_modified_date = times_defined.contains(i);
676 if file.has_last_modified_date {
677 let mut b8 = [0u8; 8];
678 AsyncReadExt::read_exact(header, &mut b8).await?;
679 file.last_modified_date = u64::from_le_bytes(b8).into();
680 }
681 }
682 }
683 K_WIN_ATTRIBUTES => {
684 let times_defined = read_all_or_bits(header, num_files).await?;
685 let external = {
686 let mut b = [0u8; 1];
687 AsyncReadExt::read_exact(header, &mut b).await?;
688 b[0]
689 };
690 if external != 0 {
691 return Err(Error::other(format!(
692 "kWinAttributes Unimplemented:external={external}"
693 )));
694 }
695 for (i, file) in files.iter_mut().enumerate() {
696 file.has_windows_attributes = times_defined.contains(i);
697 if file.has_windows_attributes {
698 let mut b4 = [0u8; 4];
699 AsyncReadExt::read_exact(header, &mut b4).await?;
700 file.windows_attributes = u32::from_le_bytes(b4);
701 }
702 }
703 }
704 K_START_POS => return Err(Error::other("kStartPos is unsupported, please report")),
705 K_DUMMY => {
706 AsyncSeekExt::seek(header, SeekFrom::Current(size as i64)).await?;
707 }
708 _ => {
709 AsyncSeekExt::seek(header, SeekFrom::Current(size as i64)).await?;
710 }
711 };
712 }
713
714 let mut non_empty_file_counter = 0;
715 let mut empty_file_counter = 0;
716 for (i, file) in files.iter_mut().enumerate() {
717 file.has_stream = is_empty_stream
718 .as_ref()
719 .map(|s| !s.contains(i))
720 .unwrap_or(true);
721 if file.has_stream {
722 let sub_stream_info = if let Some(s) = archive.sub_streams_info.as_ref() {
723 s
724 } else {
725 return Err(Error::other(
726 "Archive contains file with streams but no subStreamsInfo",
727 ));
728 };
729 file.is_directory = false;
730 file.is_anti_item = false;
731 file.has_crc = sub_stream_info.has_crc.contains(non_empty_file_counter);
732 file.crc = sub_stream_info.crcs[non_empty_file_counter];
733 file.size = sub_stream_info.unpack_sizes[non_empty_file_counter];
734 non_empty_file_counter += 1;
735 } else {
736 file.is_directory = if let Some(s) = &is_empty_file {
737 !s.contains(empty_file_counter)
738 } else {
739 true
740 };
741 file.is_anti_item = is_anti
742 .as_ref()
743 .map(|s| s.contains(empty_file_counter))
744 .unwrap_or(false);
745 file.has_crc = false;
746 file.size = 0;
747 empty_file_counter += 1;
748 }
749 }
750 archive.files = files;
751
752 Self::calculate_stream_map(archive)?;
753 Ok(())
754 }
755
756 fn calculate_stream_map(archive: &mut Archive) -> Result<(), Error> {
757 let mut stream_map = StreamMap::default();
758
759 let mut next_block_pack_stream_index = 0;
760 let num_blocks = archive.blocks.len();
761 stream_map.block_first_pack_stream_index = vec![0; num_blocks];
762 for i in 0..num_blocks {
763 stream_map.block_first_pack_stream_index[i] = next_block_pack_stream_index;
764 next_block_pack_stream_index += archive.blocks[i].packed_streams.len();
765 }
766
767 let mut next_pack_stream_offset = 0;
768 let num_pack_sizes = archive.pack_sizes.len();
769 stream_map.pack_stream_offsets = vec![0; num_pack_sizes];
770 for i in 0..num_pack_sizes {
771 stream_map.pack_stream_offsets[i] = next_pack_stream_offset;
772 next_pack_stream_offset += archive.pack_sizes[i];
773 }
774
775 stream_map.block_first_file_index = vec![0; num_blocks];
776 stream_map.file_block_index = vec![None; archive.files.len()];
777 let mut next_block_index = 0;
778 let mut next_block_unpack_stream_index = 0;
779 for i in 0..archive.files.len() {
780 if !archive.files[i].has_stream && next_block_unpack_stream_index == 0 {
781 stream_map.file_block_index[i] = None;
782 continue;
783 }
784 if next_block_unpack_stream_index == 0 {
785 while next_block_index < archive.blocks.len() {
786 stream_map.block_first_file_index[next_block_index] = i;
787 if archive.blocks[next_block_index].num_unpack_sub_streams > 0 {
788 break;
789 }
790 next_block_index += 1;
791 }
792 if next_block_index >= archive.blocks.len() {
793 return Err(Error::other("Too few blocks in archive"));
794 }
795 }
796 stream_map.file_block_index[i] = Some(next_block_index);
797 if !archive.files[i].has_stream {
798 continue;
799 }
800
801 if stream_map.block_first_file_index[next_block_index] == i {
803 let first_pack_stream_index =
804 stream_map.block_first_pack_stream_index[next_block_index];
805 let pack_size = archive.pack_sizes[first_pack_stream_index];
806
807 archive.files[i].compressed_size = pack_size;
808 }
809
810 next_block_unpack_stream_index += 1;
811 if next_block_unpack_stream_index
812 >= archive.blocks[next_block_index].num_unpack_sub_streams
813 {
814 next_block_index += 1;
815 next_block_unpack_stream_index = 0;
816 }
817 }
818
819 archive.stream_map = stream_map;
820 Ok(())
821 }
822
823 async fn read_pack_info<R: AsyncRead + Unpin>(
824 header: &mut R,
825 archive: &mut Archive,
826 ) -> Result<(), Error> {
827 archive.pack_pos = read_variable_u64(header).await?;
828 let num_pack_streams = read_variable_usize(header, "num pack streams").await?;
829 let mut nid = {
830 let mut b = [0u8; 1];
831 AsyncReadExt::read_exact(header, &mut b).await?;
832 b[0]
833 };
834 if nid == K_SIZE {
835 archive.pack_sizes = vec![0u64; num_pack_streams];
836 for i in 0..archive.pack_sizes.len() {
837 archive.pack_sizes[i] = read_variable_u64(header).await?;
838 }
839 nid = {
840 let mut b = [0u8; 1];
841 AsyncReadExt::read_exact(header, &mut b).await?;
842 b[0]
843 };
844 }
845
846 if nid == K_CRC {
847 archive.pack_crcs_defined = read_all_or_bits(header, num_pack_streams).await?;
848 archive.pack_crcs = vec![0; num_pack_streams];
849 for i in 0..num_pack_streams {
850 if archive.pack_crcs_defined.contains(i) {
851 let mut b4 = [0u8; 4];
852 AsyncReadExt::read_exact(header, &mut b4).await?;
853 archive.pack_crcs[i] = u32::from_le_bytes(b4) as u64;
854 }
855 }
856 nid = {
857 let mut b = [0u8; 1];
858 AsyncReadExt::read_exact(header, &mut b).await?;
859 b[0]
860 };
861 }
862
863 if nid != K_END {
864 return Err(Error::BadTerminatedPackInfo(nid));
865 }
866
867 Ok(())
868 }
869 async fn read_unpack_info<R: AsyncRead + Unpin>(
870 header: &mut R,
871 archive: &mut Archive,
872 ) -> Result<(), Error> {
873 let nid = {
874 let mut b = [0u8; 1];
875 AsyncReadExt::read_exact(header, &mut b).await?;
876 b[0]
877 };
878 if nid != K_FOLDER {
879 return Err(Error::other(format!("Expected kFolder, got {nid}")));
880 }
881 let num_blocks = read_variable_usize(header, "num blocks").await?;
882
883 archive.blocks.reserve_exact(num_blocks);
884 let external = {
885 let mut b = [0u8; 1];
886 AsyncReadExt::read_exact(header, &mut b).await?;
887 b[0]
888 };
889 if external != 0 {
890 return Err(Error::ExternalUnsupported);
891 }
892
893 for _ in 0..num_blocks {
894 archive.blocks.push(Self::read_block(header).await?);
895 }
896
897 let nid = {
898 let mut b = [0u8; 1];
899 AsyncReadExt::read_exact(header, &mut b).await?;
900 b[0]
901 };
902 if nid != K_CODERS_UNPACK_SIZE {
903 return Err(Error::other(format!(
904 "Expected kCodersUnpackSize, got {nid}"
905 )));
906 }
907
908 for block in archive.blocks.iter_mut() {
909 let tos = block.total_output_streams;
910 block.unpack_sizes.reserve_exact(tos);
911 for _ in 0..tos {
912 block.unpack_sizes.push(read_variable_u64(header).await?);
913 }
914 }
915
916 let mut nid = {
917 let mut b = [0u8; 1];
918 AsyncReadExt::read_exact(header, &mut b).await?;
919 b[0]
920 };
921 if nid == K_CRC {
922 let crcs_defined = read_all_or_bits(header, num_blocks).await?;
923 for i in 0..num_blocks {
924 if crcs_defined.contains(i) {
925 archive.blocks[i].has_crc = true;
926 let mut b4 = [0u8; 4];
927 AsyncReadExt::read_exact(header, &mut b4).await?;
928 archive.blocks[i].crc = u32::from_le_bytes(b4) as u64;
929 } else {
930 archive.blocks[i].has_crc = false;
931 }
932 }
933 nid = {
934 let mut b = [0u8; 1];
935 AsyncReadExt::read_exact(header, &mut b).await?;
936 b[0]
937 };
938 }
939 if nid != K_END {
940 return Err(Error::BadTerminatedUnpackInfo);
941 }
942
943 Ok(())
944 }
945
946 async fn read_sub_streams_info<R: AsyncRead + Unpin>(
947 header: &mut R,
948 archive: &mut Archive,
949 ) -> Result<(), Error> {
950 for block in archive.blocks.iter_mut() {
951 block.num_unpack_sub_streams = 1;
952 }
953 let mut total_unpack_streams = archive.blocks.len();
954
955 let mut nid = {
956 let mut b = [0u8; 1];
957 AsyncReadExt::read_exact(header, &mut b).await?;
958 b[0]
959 };
960 if nid == K_NUM_UNPACK_STREAM {
961 total_unpack_streams = 0;
962 for block in archive.blocks.iter_mut() {
963 let num_streams = read_variable_usize(header, "numStreams").await?;
964 block.num_unpack_sub_streams = num_streams;
965 total_unpack_streams += num_streams;
966 }
967 nid = {
968 let mut b = [0u8; 1];
969 AsyncReadExt::read_exact(header, &mut b).await?;
970 b[0]
971 };
972 }
973
974 let mut sub_streams_info = SubStreamsInfo::default();
975 sub_streams_info
976 .unpack_sizes
977 .resize(total_unpack_streams, Default::default());
978 sub_streams_info
979 .has_crc
980 .reserve_len_exact(total_unpack_streams);
981 sub_streams_info.crcs = vec![0; total_unpack_streams];
982
983 let mut next_unpack_stream = 0;
984 for block in archive.blocks.iter() {
985 if block.num_unpack_sub_streams == 0 {
986 continue;
987 }
988 let mut sum = 0;
989 if nid == K_SIZE {
990 for _i in 0..block.num_unpack_sub_streams - 1 {
991 let size = read_variable_u64(header).await?;
992 sub_streams_info.unpack_sizes[next_unpack_stream] = size;
993 next_unpack_stream += 1;
994 sum += size;
995 }
996 }
997 if sum > block.get_unpack_size() {
998 return Err(Error::other(
999 "sum of unpack sizes of block exceeds total unpack size",
1000 ));
1001 }
1002 sub_streams_info.unpack_sizes[next_unpack_stream] = block.get_unpack_size() - sum;
1004 next_unpack_stream += 1;
1005 }
1006 if nid == K_SIZE {
1007 nid = {
1008 let mut b = [0u8; 1];
1009 AsyncReadExt::read_exact(header, &mut b).await?;
1010 b[0]
1011 };
1012 }
1013
1014 let mut num_digests = 0;
1015 for block in archive.blocks.iter() {
1016 if block.num_unpack_sub_streams != 1 || !block.has_crc {
1017 num_digests += block.num_unpack_sub_streams;
1018 }
1019 }
1020
1021 if nid == K_CRC {
1022 let has_missing_crc = read_all_or_bits(header, num_digests).await?;
1023 let mut missing_crcs = vec![0; num_digests];
1024 for (i, missing_crc) in missing_crcs.iter_mut().enumerate() {
1025 if has_missing_crc.contains(i) {
1026 let mut b4 = [0u8; 4];
1027 AsyncReadExt::read_exact(header, &mut b4).await?;
1028 *missing_crc = u32::from_le_bytes(b4) as u64;
1029 }
1030 }
1031 let mut next_crc = 0;
1032 let mut next_missing_crc = 0;
1033 for block in archive.blocks.iter() {
1034 if block.num_unpack_sub_streams == 1 && block.has_crc {
1035 sub_streams_info.has_crc.insert(next_crc);
1036 sub_streams_info.crcs[next_crc] = block.crc;
1037 next_crc += 1;
1038 } else {
1039 for _i in 0..block.num_unpack_sub_streams {
1040 if has_missing_crc.contains(next_missing_crc) {
1041 sub_streams_info.has_crc.insert(next_crc);
1042 } else {
1043 sub_streams_info.has_crc.remove(next_crc);
1044 }
1045 sub_streams_info.crcs[next_crc] = missing_crcs[next_missing_crc];
1046 next_crc += 1;
1047 next_missing_crc += 1;
1048 }
1049 }
1050 }
1051
1052 nid = {
1053 let mut b = [0u8; 1];
1054 AsyncReadExt::read_exact(header, &mut b).await?;
1055 b[0]
1056 };
1057 }
1058
1059 if nid != K_END {
1060 return Err(Error::BadTerminatedSubStreamsInfo);
1061 }
1062
1063 archive.sub_streams_info = Some(sub_streams_info);
1064 Ok(())
1065 }
1066
1067 async fn read_block<R: AsyncRead + Unpin>(header: &mut R) -> Result<Block, Error> {
1068 let mut block = Block::default();
1069
1070 let num_coders = read_variable_usize(header, "num coders").await?;
1071 let mut coders = Vec::with_capacity(num_coders);
1072 let mut total_in_streams = 0;
1073 let mut total_out_streams = 0;
1074 for _i in 0..num_coders {
1075 let mut coder = Coder::default();
1076 let bits = {
1077 let mut b = [0u8; 1];
1078 AsyncReadExt::read_exact(header, &mut b).await?;
1079 b[0]
1080 };
1081 let id_size = bits & 0xF;
1082 let is_simple = (bits & 0x10) == 0;
1083 let has_attributes = (bits & 0x20) != 0;
1084 let more_alternative_methods = (bits & 0x80) != 0;
1085
1086 coder.id_size = id_size as usize;
1087
1088 AsyncReadExt::read_exact(header, coder.decompression_method_id_mut()).await?;
1089 if is_simple {
1090 coder.num_in_streams = 1;
1091 coder.num_out_streams = 1;
1092 } else {
1093 coder.num_in_streams = read_variable_u64(header).await?;
1094 coder.num_out_streams = read_variable_u64(header).await?;
1095 }
1096 total_in_streams += coder.num_in_streams;
1097 total_out_streams += coder.num_out_streams;
1098 if has_attributes {
1099 let properties_size = read_variable_usize(header, "properties size").await?;
1100 let mut props = vec![0u8; properties_size];
1101 AsyncReadExt::read_exact(header, &mut props).await?;
1102 coder.properties = props;
1103 }
1104 coders.push(coder);
1105 if more_alternative_methods {
1107 return Err(Error::other(
1108 "Alternative methods are unsupported, please report. The reference implementation doesn't support them either.",
1109 ));
1110 }
1111 }
1112 block.coders = coders;
1113 let total_in_streams = assert_usize(total_in_streams, "totalInStreams")?;
1114 let total_out_streams = assert_usize(total_out_streams, "totalOutStreams")?;
1115 block.total_input_streams = total_in_streams;
1116 block.total_output_streams = total_out_streams;
1117
1118 if total_out_streams == 0 {
1119 return Err(Error::other("Total output streams can't be 0"));
1120 }
1121 let num_bind_pairs = total_out_streams - 1;
1122 let mut bind_pairs = Vec::with_capacity(num_bind_pairs);
1123 for _ in 0..num_bind_pairs {
1124 let bp = BindPair {
1125 in_index: read_variable_u64(header).await?,
1126 out_index: read_variable_u64(header).await?,
1127 };
1128 bind_pairs.push(bp);
1129 }
1130 block.bind_pairs = bind_pairs;
1131
1132 if total_in_streams < num_bind_pairs {
1133 return Err(Error::other(
1134 "Total input streams can't be less than the number of bind pairs",
1135 ));
1136 }
1137 let num_packed_streams = total_in_streams - num_bind_pairs;
1138 let mut packed_streams = vec![0; num_packed_streams];
1139 if num_packed_streams == 1 {
1140 let mut index = u64::MAX;
1141 for i in 0..total_in_streams {
1142 if block.find_bind_pair_for_in_stream(i as u64).is_none() {
1143 index = i as u64;
1144 break;
1145 }
1146 }
1147 if index == u64::MAX {
1148 return Err(Error::other("Couldn't find stream's bind pair index"));
1149 }
1150 packed_streams[0] = index;
1151 } else {
1152 for packed_stream in packed_streams.iter_mut() {
1153 *packed_stream = read_variable_u64(header).await?;
1154 }
1155 }
1156 block.packed_streams = packed_streams;
1157
1158 Ok(block)
1159 }
1160}
1161
1162#[inline]
1163async fn read_variable_usize<R: AsyncRead + Unpin>(
1164 reader: &mut R,
1165 field: &str,
1166) -> Result<usize, Error> {
1167 let size = read_variable_u64(reader).await?;
1168 assert_usize(size, field)
1169}
1170
1171#[inline]
1172fn assert_usize(size: u64, field: &str) -> Result<usize, Error> {
1173 if size > usize::MAX as u64 {
1174 return Err(Error::other(format!("Cannot handle {field} {size}")));
1175 }
1176 Ok(size as usize)
1177}
1178
1179async fn read_variable_u64<R: AsyncRead + Unpin>(reader: &mut R) -> io::Result<u64> {
1180 let first = {
1181 let mut b = [0u8; 1];
1182 AsyncReadExt::read_exact(reader, &mut b).await?;
1183 b[0] as u64
1184 };
1185 let mut mask = 0x80_u64;
1186 let mut value = 0;
1187 for i in 0..8 {
1188 if (first & mask) == 0 {
1189 return Ok(value | ((first & (mask - 1)) << (8 * i)));
1190 }
1191 let b = {
1192 let mut bb = [0u8; 1];
1193 AsyncReadExt::read_exact(reader, &mut bb).await?;
1194 bb[0] as u64
1195 };
1196 value |= b << (8 * i);
1197 mask >>= 1;
1198 }
1199 Ok(value)
1200}
1201
1202async fn read_all_or_bits<R: AsyncRead + Unpin>(header: &mut R, size: usize) -> io::Result<BitSet> {
1203 let all = {
1204 let mut b = [0u8; 1];
1205 AsyncReadExt::read_exact(header, &mut b).await?;
1206 b[0]
1207 };
1208 if all != 0 {
1209 let mut bits = BitSet::with_capacity(size);
1210 for i in 0..size {
1211 bits.insert(i);
1212 }
1213 Ok(bits)
1214 } else {
1215 read_bits(header, size).await
1216 }
1217}
1218
1219async fn read_bits<R: AsyncRead + Unpin>(header: &mut R, size: usize) -> io::Result<BitSet> {
1220 let mut bits = BitSet::with_capacity(size);
1221 let mut mask = 0u32;
1222 let mut cache = 0u32;
1223 for i in 0..size {
1224 if mask == 0 {
1225 mask = 0x80;
1226 let mut b = [0u8; 1];
1227 AsyncReadExt::read_exact(header, &mut b).await?;
1228 cache = b[0] as u32;
1229 }
1230 if (cache & mask) != 0 {
1231 bits.insert(i);
1232 }
1233 mask >>= 1;
1234 }
1235 Ok(bits)
1236}
1237
1238#[derive(Copy, Clone)]
1239struct IndexEntry {
1240 block_index: Option<usize>,
1241 file_index: usize,
1242}
1243
1244pub struct ArchiveReader<R: futures::io::AsyncRead + futures::io::AsyncSeek + Unpin> {
1246 source: R,
1247 archive: Archive,
1248 password: Password,
1249 thread_count: u32,
1250 index: HashMap<String, IndexEntry>,
1251}
1252
1253#[cfg(not(target_arch = "wasm32"))]
1254impl ArchiveReader<futures::io::Cursor<Vec<u8>>> {
1255 pub async fn open(
1257 path: impl AsRef<std::path::Path>,
1258 password: Password,
1259 ) -> Result<Self, Error> {
1260 let data = afs::read(path.as_ref())
1261 .await
1262 .map_err(|e| Error::file_open(e, path.as_ref().to_string_lossy().to_string()))?;
1263 let cursor = futures::io::Cursor::new(data);
1264 Self::new(cursor, password).await
1265 }
1266
1267 pub async fn open_from_bytes(data: Vec<u8>, password: Password) -> Result<Self, Error> {
1269 let cursor = futures::io::Cursor::new(data);
1270 Self::new(cursor, password).await
1271 }
1272}
1273
1274impl<R: futures::io::AsyncRead + futures::io::AsyncSeek + Unpin> ArchiveReader<R> {
1275 #[inline]
1277 pub(crate) async fn new(mut source: R, password: Password) -> Result<Self, Error> {
1278 let archive = Archive::read(&mut source, &password).await?;
1279
1280 let mut reader = Self {
1281 source,
1282 archive,
1283 password,
1284 thread_count: 1,
1285 index: HashMap::default(),
1286 };
1287
1288 reader.fill_index();
1289
1290 let thread_count =
1291 std::thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
1292 reader.set_thread_count(thread_count.get() as u32);
1293
1294 Ok(reader)
1295 }
1296
1297 #[inline]
1307 pub fn from_archive(archive: Archive, source: R, password: Password) -> Self {
1308 let mut reader = Self {
1309 source,
1310 archive,
1311 password,
1312 thread_count: 1,
1313 index: HashMap::default(),
1314 };
1315
1316 reader.fill_index();
1317
1318 let thread_count =
1319 std::thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
1320 reader.set_thread_count(thread_count.get() as u32);
1321
1322 reader
1323 }
1324
1325 pub fn set_thread_count(&mut self, thread_count: u32) {
1330 self.thread_count = thread_count.clamp(1, 256);
1331 }
1332
1333 fn fill_index(&mut self) {
1334 for (file_index, file) in self.archive.files.iter().enumerate() {
1335 let block_index = self.archive.stream_map.file_block_index[file_index];
1336
1337 self.index.insert(
1338 file.name.clone(),
1339 IndexEntry {
1340 block_index,
1341 file_index,
1342 },
1343 );
1344 }
1345 }
1346
1347 #[inline]
1352 pub fn archive(&self) -> &Archive {
1353 &self.archive
1354 }
1355
1356 async fn build_decode_stack<'r>(
1357 source: &'r mut R,
1358 archive: &Archive,
1359 block_index: usize,
1360 password: &Password,
1361 thread_count: u32,
1362 ) -> Result<(Box<dyn futures::io::AsyncRead + Unpin + 'r>, usize), Error> {
1363 let block = &archive.blocks[block_index];
1364 if block.total_input_streams > block.total_output_streams {
1365 return Self::build_decode_stack2(source, archive, block_index, password, thread_count);
1366 }
1367 let first_pack_stream_index = archive.stream_map.block_first_pack_stream_index[block_index];
1368 let block_offset = SIGNATURE_HEADER_SIZE
1369 + archive.pack_pos
1370 + archive.stream_map.pack_stream_offsets[first_pack_stream_index];
1371
1372 let (mut has_crc, mut crc) = (block.has_crc, block.crc);
1373
1374 if !has_crc && block.num_unpack_sub_streams == 1 {
1376 if let Some(sub_streams_info) = archive.sub_streams_info.as_ref() {
1377 let mut substream_index = 0;
1378 for i in 0..block_index {
1379 substream_index += archive.blocks[i].num_unpack_sub_streams;
1380 }
1381
1382 if sub_streams_info.has_crc.contains(substream_index) {
1385 has_crc = true;
1386 crc = sub_streams_info.crcs[substream_index];
1387 }
1388 }
1389 }
1390
1391 AsyncSeekExt::seek(source, SeekFrom::Start(block_offset)).await?;
1392 let pack_size = archive.pack_sizes[first_pack_stream_index] as usize;
1393
1394 let mut decoder: Box<dyn futures::io::AsyncRead + Unpin> =
1395 Box::new(BoundedReader::new(source, pack_size));
1396 let block = &archive.blocks[block_index];
1397 for (index, coder) in block.ordered_coder_iter() {
1398 if coder.num_in_streams != 1 || coder.num_out_streams != 1 {
1399 return Err(Error::unsupported(
1400 "Multi input/output stream coders are not supported",
1401 ));
1402 }
1403 let next = add_decoder(
1404 decoder,
1405 block.get_unpack_size_at_index(index) as usize,
1406 coder,
1407 password,
1408 MAX_MEM_LIMIT_KB,
1409 thread_count,
1410 )
1411 .await?;
1412 decoder = Box::new(next);
1413 }
1414 if has_crc {
1415 decoder = Box::new(Crc32VerifyingReader::new(
1416 decoder,
1417 block.get_unpack_size() as usize,
1418 crc,
1419 ));
1420 }
1421
1422 Ok((decoder, pack_size))
1423 }
1424
1425 fn build_decode_stack2<'r>(
1426 source: &'r mut R,
1427 archive: &Archive,
1428 block_index: usize,
1429 password: &Password,
1430 thread_count: u32,
1431 ) -> Result<(Box<dyn futures::io::AsyncRead + Unpin + 'r>, usize), Error> {
1432 const MAX_CODER_COUNT: usize = 32;
1433 let block = &archive.blocks[block_index];
1434 if block.coders.len() > MAX_CODER_COUNT {
1435 return Err(Error::unsupported(format!(
1436 "Too many coders: {}",
1437 block.coders.len()
1438 )));
1439 }
1440
1441 assert!(block.total_input_streams > block.total_output_streams);
1442 let shared_source = Rc::new(RefCell::new(source));
1443 let first_pack_stream_index = archive.stream_map.block_first_pack_stream_index[block_index];
1444 let start_pos = SIGNATURE_HEADER_SIZE + archive.pack_pos;
1445 let offsets = &archive.stream_map.pack_stream_offsets[first_pack_stream_index..];
1446
1447 let mut sources = Vec::with_capacity(block.packed_streams.len());
1448
1449 for (i, offset) in offsets[..block.packed_streams.len()].iter().enumerate() {
1450 let pack_pos = start_pos + offset;
1451 let pack_size = archive.pack_sizes[first_pack_stream_index + i];
1452
1453 let pack_reader = SharedBoundedReader::new(
1454 Rc::clone(&shared_source),
1455 (pack_pos, pack_pos + pack_size),
1456 );
1457
1458 sources.push(pack_reader);
1459 }
1460
1461 let mut coder_to_stream_map = [usize::MAX; MAX_CODER_COUNT];
1462
1463 let mut si = 0;
1464 for (i, coder) in block.coders.iter().enumerate() {
1465 coder_to_stream_map[i] = si;
1466 si += coder.num_in_streams as usize;
1467 }
1468
1469 let main_coder_index = {
1470 let mut coder_used = [false; MAX_CODER_COUNT];
1471 for bp in block.bind_pairs.iter() {
1472 coder_used[bp.out_index as usize] = true;
1473 }
1474 let mut mci = 0;
1475 for (i, used) in coder_used[..block.coders.len()].iter().enumerate() {
1476 if !used {
1477 mci = i;
1478 break;
1479 }
1480 }
1481 mci
1482 };
1483
1484 let id = block.coders[main_coder_index].encoder_method_id();
1485 if id != EncoderMethod::ID_BCJ2 {
1486 return Err(Error::unsupported(format!("Unsupported method: {id:?}")));
1487 }
1488
1489 let num_in_streams = block.coders[main_coder_index].num_in_streams as usize;
1490 let mut inputs: Vec<Box<dyn futures::io::AsyncRead + Unpin>> =
1491 Vec::with_capacity(num_in_streams);
1492 let start_i = coder_to_stream_map[main_coder_index];
1493 for i in start_i..num_in_streams + start_i {
1494 inputs.push(Self::get_in_stream(
1495 block,
1496 &sources,
1497 &coder_to_stream_map,
1498 password,
1499 i,
1500 thread_count,
1501 )?);
1502 }
1503 let inputs_std = inputs
1504 .into_iter()
1505 .map(crate::util::decompress::AsyncReadSeekAsStd::new)
1506 .collect::<Vec<_>>();
1507 let mut decoder: Box<dyn futures::io::AsyncRead + Unpin> = Box::new(AsyncStdRead::new(
1508 Bcj2Reader::new(inputs_std, block.get_unpack_size()),
1509 ));
1510 if block.has_crc {
1511 decoder = Box::new(Crc32VerifyingReader::new(
1512 decoder,
1513 block.get_unpack_size() as usize,
1514 block.crc,
1515 ));
1516 }
1517 Ok((
1518 decoder,
1519 archive.pack_sizes[first_pack_stream_index] as usize,
1520 ))
1521 }
1522
1523 fn get_in_stream<'r>(
1524 block: &Block,
1525 sources: &[SharedBoundedReader<'r, R>],
1526 coder_to_stream_map: &[usize],
1527 password: &Password,
1528 in_stream_index: usize,
1529 thread_count: u32,
1530 ) -> Result<Box<dyn futures::io::AsyncRead + Unpin + 'r>, Error>
1531 where
1532 R: 'r,
1533 {
1534 let index = block
1535 .packed_streams
1536 .iter()
1537 .position(|&i| i == in_stream_index as u64);
1538 if let Some(index) = index {
1539 return Ok(Box::new(sources[index].clone()));
1540 }
1541
1542 let bp = block
1543 .find_bind_pair_for_in_stream(in_stream_index as u64)
1544 .ok_or_else(|| {
1545 Error::other(format!(
1546 "Couldn't find bind pair for stream {in_stream_index}"
1547 ))
1548 })?;
1549 let index = bp.out_index as usize;
1550
1551 Self::get_in_stream2(
1552 block,
1553 sources,
1554 coder_to_stream_map,
1555 password,
1556 index,
1557 thread_count,
1558 )
1559 }
1560
1561 fn get_in_stream2<'r>(
1562 block: &Block,
1563 sources: &[SharedBoundedReader<'r, R>],
1564 coder_to_stream_map: &[usize],
1565 password: &Password,
1566 in_stream_index: usize,
1567 thread_count: u32,
1568 ) -> Result<Box<dyn futures::io::AsyncRead + Unpin + 'r>, Error>
1569 where
1570 R: 'r,
1571 {
1572 let coder = &block.coders[in_stream_index];
1573 let start_index = coder_to_stream_map[in_stream_index];
1574 if start_index == usize::MAX {
1575 return Err(Error::other("in_stream_index out of range"));
1576 }
1577 let uncompressed_len = block.unpack_sizes[in_stream_index] as usize;
1578 if coder.num_in_streams == 1 {
1579 let input = Self::get_in_stream(
1580 block,
1581 sources,
1582 coder_to_stream_map,
1583 password,
1584 start_index,
1585 thread_count,
1586 )?;
1587
1588 let decoder = async_io::block_on(add_decoder(
1589 input,
1590 uncompressed_len,
1591 coder,
1592 password,
1593 MAX_MEM_LIMIT_KB,
1594 thread_count,
1595 ))?;
1596 return Ok(Box::new(decoder));
1597 }
1598 Err(Error::unsupported(
1599 "Multi input stream coders are not yet supported",
1600 ))
1601 }
1602
1603 pub(crate) async fn for_each_entries<
1604 F: for<'a> FnMut(
1605 &'a ArchiveEntry,
1606 &'a mut (dyn futures::io::AsyncRead + Unpin + 'a),
1607 ) -> Pin<Box<dyn Future<Output = Result<bool, Error>> + 'a>>,
1608 >(
1609 &mut self,
1610 mut each: F,
1611 ) -> Result<(), Error> {
1612 let block_count = self.archive.blocks.len();
1613 for block_index in 0..block_count {
1614 let forder_dec = BlockDecoder::new(
1615 self.thread_count,
1616 block_index,
1617 &self.archive,
1618 &self.password,
1619 &mut self.source,
1620 );
1621 if !forder_dec
1622 .for_each_entries(&mut each)
1623 .await
1624 .map_err(|e| e.maybe_bad_password(!self.password.is_empty()))?
1625 {
1626 return Ok(());
1627 }
1628 }
1629 for file_index in 0..self.archive.files.len() {
1631 let block_index = self.archive.stream_map.file_block_index[file_index];
1632 if block_index.is_none() {
1633 let file = &self.archive.files[file_index];
1634 let mut empty_reader = AsyncStdRead::new([0u8; 0].as_slice());
1635 if !each(file, &mut empty_reader).await? {
1636 return Ok(());
1637 }
1638 }
1639 }
1640 Ok(())
1641 }
1642
1643 pub async fn read_file(&mut self, name: &str) -> Result<Vec<u8>, Error> {
1649 let index_entry = *self.index.get(name).ok_or(Error::FileNotFound)?;
1650 let file = &self.archive.files[index_entry.file_index];
1651
1652 if !file.has_stream {
1653 return Ok(Vec::new());
1654 }
1655
1656 let block_index = index_entry
1657 .block_index
1658 .ok_or_else(|| Error::other("File has no associated block"))?;
1659
1660 match self.archive.is_solid {
1661 true => {
1662 use std::rc::Rc;
1663 let result_cell: Rc<RefCell<Option<Vec<u8>>>> = Rc::new(RefCell::new(None));
1664 let target_file_ptr = file as *const _;
1665
1666 BlockDecoder::new(
1667 self.thread_count,
1668 block_index,
1669 &self.archive,
1670 &self.password,
1671 &mut self.source,
1672 )
1673 .for_each_entries(&mut |archive_entry, reader| {
1674 let result_cell = Rc::clone(&result_cell);
1675 Box::pin(async move {
1676 let mut data = Vec::with_capacity(archive_entry.size as usize);
1677 AsyncReadExt::read_to_end(reader, &mut data).await?;
1678 if std::ptr::eq(archive_entry, target_file_ptr) {
1679 *result_cell.borrow_mut() = Some(data);
1680 Ok(false)
1681 } else {
1682 Ok(true)
1683 }
1684 })
1685 })
1686 .await?;
1687
1688 result_cell.borrow_mut().take().ok_or(Error::FileNotFound)
1689 }
1690 false => {
1691 let pack_index = self.archive.stream_map.block_first_pack_stream_index[block_index];
1692 let pack_offset = self.archive.stream_map.pack_stream_offsets[pack_index];
1693 let block_offset = SIGNATURE_HEADER_SIZE + self.archive.pack_pos + pack_offset;
1694
1695 AsyncSeekExt::seek(&mut self.source, SeekFrom::Start(block_offset)).await?;
1696
1697 let (mut block_reader, _size) = Self::build_decode_stack(
1698 &mut self.source,
1699 &self.archive,
1700 block_index,
1701 &self.password,
1702 self.thread_count,
1703 )
1704 .await?;
1705
1706 let mut data = Vec::with_capacity(file.size as usize);
1707 let mut decoder: Box<dyn futures::io::AsyncRead + Unpin> =
1708 Box::new(BoundedReader::new(&mut block_reader, file.size as usize));
1709
1710 if file.has_crc {
1711 decoder = Box::new(Crc32VerifyingReader::new(
1712 decoder,
1713 file.size as usize,
1714 file.crc,
1715 ));
1716 }
1717
1718 AsyncReadExt::read_to_end(&mut decoder, &mut data).await?;
1719
1720 Ok(data)
1721 }
1722 }
1723 }
1724
1725 pub fn file_compression_methods(
1727 &self,
1728 file_name: &str,
1729 methods: &mut Vec<EncoderMethod>,
1730 ) -> Result<(), Error> {
1731 let index_entry = self.index.get(file_name).ok_or(Error::FileNotFound)?;
1732 let file = &self.archive.files[index_entry.file_index];
1733
1734 if !file.has_stream {
1735 return Ok(());
1736 }
1737
1738 let block_index = index_entry
1739 .block_index
1740 .ok_or_else(|| Error::other("File has no associated block"))?;
1741
1742 let block = self
1743 .archive
1744 .blocks
1745 .get(block_index)
1746 .ok_or_else(|| Error::other("Block not found"))?;
1747
1748 block
1749 .coders
1750 .iter()
1751 .filter_map(|coder| EncoderMethod::by_id(coder.encoder_method_id()))
1752 .for_each(|method| {
1753 methods.push(method);
1754 });
1755
1756 Ok(())
1757 }
1758}
1759
1760pub struct BlockDecoder<'a, R: AsyncRead + AsyncSeek + Unpin> {
1765 thread_count: u32,
1766 block_index: usize,
1767 archive: &'a Archive,
1768 password: &'a Password,
1769 source: &'a mut R,
1770}
1771
1772impl<'a, R: AsyncRead + AsyncSeek + Unpin> BlockDecoder<'a, R> {
1773 pub fn new(
1783 thread_count: u32,
1784 block_index: usize,
1785 archive: &'a Archive,
1786 password: &'a Password,
1787 source: &'a mut R,
1788 ) -> Self {
1789 Self {
1790 thread_count,
1791 block_index,
1792 archive,
1793 password,
1794 source,
1795 }
1796 }
1797
1798 pub fn set_thread_count(&mut self, thread_count: u32) {
1801 self.thread_count = thread_count.clamp(1, 256);
1802 }
1803
1804 pub fn entries(&self) -> &[ArchiveEntry] {
1808 let start = self.archive.stream_map.block_first_file_index[self.block_index];
1809 let file_count = self.archive.blocks[self.block_index].num_unpack_sub_streams;
1810 &self.archive.files[start..(file_count + start)]
1811 }
1812
1813 pub fn entry_count(&self) -> usize {
1815 self.archive.blocks[self.block_index].num_unpack_sub_streams
1816 }
1817
1818 pub async fn for_each_entries<
1825 F: for<'b> FnMut(
1826 &'b ArchiveEntry,
1827 &'b mut (dyn futures::io::AsyncRead + Unpin + 'b),
1828 ) -> Pin<Box<dyn Future<Output = Result<bool, Error>> + 'b>>,
1829 >(
1830 self,
1831 each: &mut F,
1832 ) -> Result<bool, Error> {
1833 let Self {
1834 thread_count,
1835 block_index,
1836 archive,
1837 password,
1838 source,
1839 } = self;
1840 let (mut block_reader, _size) =
1841 ArchiveReader::build_decode_stack(source, archive, block_index, password, thread_count)
1842 .await?;
1843 let start = archive.stream_map.block_first_file_index[block_index];
1844 let file_count = archive.blocks[block_index].num_unpack_sub_streams;
1845
1846 for file_index in start..(file_count + start) {
1847 let file = &archive.files[file_index];
1848 if file.has_stream && file.size > 0 {
1849 let mut decoder: Box<dyn futures::io::AsyncRead + Unpin> =
1850 Box::new(BoundedReader::new(&mut block_reader, file.size as usize));
1851 if file.has_crc {
1852 decoder = Box::new(Crc32VerifyingReader::new(
1853 decoder,
1854 file.size as usize,
1855 file.crc,
1856 ));
1857 }
1858 {
1859 let cont = each(file, &mut decoder)
1860 .await
1861 .map_err(|e| e.maybe_bad_password(!password.is_empty()))?;
1862 if !cont {
1863 return Ok(false);
1864 }
1865 }
1866 } else {
1867 let mut empty_reader = AsyncStdRead::new([0u8; 0].as_slice());
1868 if !each(file, &mut empty_reader).await? {
1869 return Ok(false);
1870 }
1871 }
1872 }
1873 Ok(true)
1874 }
1875}