lzma_rust2/lzip/
reader_mt.rs1use std::{
2 io::{self, Cursor, Seek, SeekFrom},
3 sync::{
4 atomic::{AtomicBool, AtomicU32, Ordering},
5 mpsc::SyncSender,
6 Arc, Mutex,
7 },
8};
9
10use super::{scan_members, LzipMember};
11use crate::{
12 set_error,
13 work_pool::{WorkPool, WorkPoolConfig, WorkPoolState},
14 work_queue::WorkerHandle,
15 LzipReader, Read,
16};
17
18#[derive(Debug)]
20struct WorkUnit {
21 member_data: Vec<u8>,
22}
23
24pub struct LzipReaderMt<R: Read + Seek> {
26 inner: R,
27 members: Vec<LzipMember>,
28 work_pool: WorkPool<WorkUnit, Vec<u8>>,
29 current_chunk: Cursor<Vec<u8>>,
30}
31
32impl<R: Read + Seek> LzipReaderMt<R> {
33 pub fn new(inner: R, num_workers: u32) -> io::Result<Self> {
38 let (inner, members) = scan_members(inner)?;
39 let num_members = members.len() as u64;
40
41 Ok(Self {
42 inner,
43 members,
44 work_pool: WorkPool::new(
45 WorkPoolConfig::new(num_workers, num_members),
46 worker_thread_logic,
47 ),
48 current_chunk: Cursor::new(Vec::new()),
49 })
50 }
51
52 pub fn member_count(&self) -> usize {
54 self.members.len()
55 }
56
57 fn get_next_uncompressed_chunk(&mut self) -> io::Result<Option<Vec<u8>>> {
58 if matches!(self.work_pool.state(), WorkPoolState::Finished) {
60 return Ok(None);
61 }
62
63 self.work_pool.get_result(|index| {
64 let member = &self.members[index as usize];
65 self.inner.seek(SeekFrom::Start(member.start_pos)).unwrap();
66 let mut member_data = vec![0u8; member.compressed_size as usize];
67 self.inner.read_exact(&mut member_data).unwrap();
68 Ok(WorkUnit { member_data })
69 })
70 }
71}
72
73fn worker_thread_logic(
75 worker_handle: WorkerHandle<(u64, WorkUnit)>,
76 result_tx: SyncSender<(u64, Vec<u8>)>,
77 shutdown_flag: Arc<AtomicBool>,
78 error_store: Arc<Mutex<Option<io::Error>>>,
79 active_workers: Arc<AtomicU32>,
80) {
81 while !shutdown_flag.load(Ordering::Acquire) {
82 let work_unit = match worker_handle.steal() {
83 Some(work) => {
84 active_workers.fetch_add(1, Ordering::Release);
85 work
86 }
87 None => {
88 break;
90 }
91 };
92
93 let (index, WorkUnit { member_data }) = work_unit;
94
95 let mut lzip_reader = LzipReader::new(member_data.as_slice());
96
97 let mut decompressed_data = Vec::new();
98 let result = match lzip_reader.read_to_end(&mut decompressed_data) {
99 Ok(_) => decompressed_data,
100 Err(error) => {
101 active_workers.fetch_sub(1, Ordering::Release);
102 set_error(error, &error_store, &shutdown_flag);
103 return;
104 }
105 };
106
107 if result_tx.send((index, result)).is_err() {
108 active_workers.fetch_sub(1, Ordering::Release);
109 return;
110 }
111
112 active_workers.fetch_sub(1, Ordering::Release);
113 }
114}
115
116impl<R: Read + Seek> Read for LzipReaderMt<R> {
117 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
118 if buf.is_empty() {
119 return Ok(0);
120 }
121
122 let bytes_read = self.current_chunk.read(buf)?;
123
124 if bytes_read > 0 {
125 return Ok(bytes_read);
126 }
127
128 let chunk_data = self.get_next_uncompressed_chunk()?;
129
130 let Some(chunk_data) = chunk_data else {
131 return Ok(0);
133 };
134
135 self.current_chunk = Cursor::new(chunk_data);
136
137 self.read(buf)
139 }
140}