lzma_rust2/lzip/
reader_mt.rs1use std::{
2 io::{self, Cursor, Seek, SeekFrom},
3 sync::{
4 atomic::{AtomicBool, AtomicU32, Ordering},
5 mpsc::SyncSender,
6 Arc, Mutex,
7 },
8};
9
10use super::{scan_members, LzipMember};
11use crate::{
12 set_error,
13 work_pool::{WorkPool, WorkPoolConfig, WorkPoolState},
14 work_queue::WorkerHandle,
15 LzipReader, Read,
16};
17
18#[derive(Debug)]
20struct WorkUnit {
21 member_data: Vec<u8>,
22}
23
24pub struct LzipReaderMt<R: Read + Seek> {
26 inner: R,
27 members: Vec<LzipMember>,
28 work_pool: WorkPool<WorkUnit, Vec<u8>>,
29 current_chunk: Cursor<Vec<u8>>,
30}
31
32impl<R: Read + Seek> LzipReaderMt<R> {
33 pub fn new(inner: R, num_workers: u32) -> io::Result<Self> {
38 let (inner, members) = scan_members(inner)?;
39 let num_members = members.len() as u64;
40
41 Ok(Self {
42 inner,
43 members,
44 work_pool: WorkPool::new(
45 WorkPoolConfig::new(num_workers, num_members),
46 worker_thread_logic,
47 ),
48 current_chunk: Cursor::new(Vec::new()),
49 })
50 }
51
52 pub fn member_count(&self) -> usize {
54 self.members.len()
55 }
56
57 fn get_next_uncompressed_chunk(&mut self) -> io::Result<Option<Vec<u8>>> {
58 if matches!(self.work_pool.state(), WorkPoolState::Finished) {
60 return Ok(None);
61 }
62
63 self.work_pool.get_result(|index| {
64 let member = &self.members[index as usize];
65 self.inner.seek(SeekFrom::Start(member.start_pos)).unwrap();
66 let mut member_data = vec![0u8; member.compressed_size as usize];
67 self.inner.read_exact(&mut member_data).unwrap();
68 Ok(WorkUnit { member_data })
69 })
70 }
71}
72
73fn worker_thread_logic(
75 worker_handle: WorkerHandle<(u64, WorkUnit)>,
76 result_tx: SyncSender<(u64, Vec<u8>)>,
77 shutdown_flag: Arc<AtomicBool>,
78 error_store: Arc<Mutex<Option<io::Error>>>,
79 active_workers: Arc<AtomicU32>,
80) {
81 while !shutdown_flag.load(Ordering::Acquire) {
82 let work_unit = match worker_handle.steal() {
83 Some(work) => {
84 active_workers.fetch_add(1, Ordering::Release);
85 work
86 }
87 None => {
88 break;
90 }
91 };
92
93 let (index, WorkUnit { member_data }) = work_unit;
94
95 let reader_result = LzipReader::new(member_data.as_slice());
96
97 let mut lzip_reader = match reader_result {
98 Ok(reader) => reader,
99 Err(error) => {
100 active_workers.fetch_sub(1, Ordering::Release);
101 set_error(error, &error_store, &shutdown_flag);
102 return;
103 }
104 };
105
106 let mut decompressed_data = Vec::new();
107 let result = match lzip_reader.read_to_end(&mut decompressed_data) {
108 Ok(_) => decompressed_data,
109 Err(error) => {
110 active_workers.fetch_sub(1, Ordering::Release);
111 set_error(error, &error_store, &shutdown_flag);
112 return;
113 }
114 };
115
116 if result_tx.send((index, result)).is_err() {
117 active_workers.fetch_sub(1, Ordering::Release);
118 return;
119 }
120
121 active_workers.fetch_sub(1, Ordering::Release);
122 }
123}
124
125impl<R: Read + Seek> Read for LzipReaderMt<R> {
126 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
127 if buf.is_empty() {
128 return Ok(0);
129 }
130
131 let bytes_read = self.current_chunk.read(buf)?;
132
133 if bytes_read > 0 {
134 return Ok(bytes_read);
135 }
136
137 let chunk_data = self.get_next_uncompressed_chunk()?;
138
139 let Some(chunk_data) = chunk_data else {
140 return Ok(0);
142 };
143
144 self.current_chunk = Cursor::new(chunk_data);
145
146 self.read(buf)
148 }
149}