lzma_rust2/lzip/
reader_mt.rs

1use std::{
2    io::{self, Cursor, Seek, SeekFrom},
3    sync::{
4        atomic::{AtomicBool, AtomicU32, Ordering},
5        mpsc::SyncSender,
6        Arc, Mutex,
7    },
8};
9
10use super::{scan_members, LzipMember};
11use crate::{
12    set_error,
13    work_pool::{WorkPool, WorkPoolConfig, WorkPoolState},
14    work_queue::WorkerHandle,
15    LzipReader, Read,
16};
17
18/// A work unit for a worker thread.
19#[derive(Debug)]
20struct WorkUnit {
21    member_data: Vec<u8>,
22}
23
24/// A multi-threaded LZIP decompressor.
25pub struct LzipReaderMt<R: Read + Seek> {
26    inner: R,
27    members: Vec<LzipMember>,
28    work_pool: WorkPool<WorkUnit, Vec<u8>>,
29    current_chunk: Cursor<Vec<u8>>,
30}
31
32impl<R: Read + Seek> LzipReaderMt<R> {
33    /// Creates a new multi-threaded LZIP reader.
34    ///
35    /// - `inner`: The reader to read compressed data from. Must implement Seek.
36    /// - `num_workers`: The maximum number of worker threads for decompression. Currently capped at 256 threads.
37    pub fn new(inner: R, num_workers: u32) -> io::Result<Self> {
38        let (inner, members) = scan_members(inner)?;
39        let num_members = members.len() as u64;
40
41        Ok(Self {
42            inner,
43            members,
44            work_pool: WorkPool::new(
45                WorkPoolConfig::new(num_workers, num_members),
46                worker_thread_logic,
47            ),
48            current_chunk: Cursor::new(Vec::new()),
49        })
50    }
51
52    /// Get the count of LZIP members found in the file.
53    pub fn member_count(&self) -> usize {
54        self.members.len()
55    }
56
57    fn get_next_uncompressed_chunk(&mut self) -> io::Result<Option<Vec<u8>>> {
58        // Check if we've processed all members
59        if matches!(self.work_pool.state(), WorkPoolState::Finished) {
60            return Ok(None);
61        }
62
63        self.work_pool.get_result(|index| {
64            let member = &self.members[index as usize];
65            self.inner.seek(SeekFrom::Start(member.start_pos)).unwrap();
66            let mut member_data = vec![0u8; member.compressed_size as usize];
67            self.inner.read_exact(&mut member_data).unwrap();
68            Ok(WorkUnit { member_data })
69        })
70    }
71}
72
73/// The logic for a single worker thread.
74fn worker_thread_logic(
75    worker_handle: WorkerHandle<(u64, WorkUnit)>,
76    result_tx: SyncSender<(u64, Vec<u8>)>,
77    shutdown_flag: Arc<AtomicBool>,
78    error_store: Arc<Mutex<Option<io::Error>>>,
79    active_workers: Arc<AtomicU32>,
80) {
81    while !shutdown_flag.load(Ordering::Acquire) {
82        let work_unit = match worker_handle.steal() {
83            Some(work) => {
84                active_workers.fetch_add(1, Ordering::Release);
85                work
86            }
87            None => {
88                // No more work available and queue is closed.
89                break;
90            }
91        };
92
93        let (index, WorkUnit { member_data }) = work_unit;
94
95        let mut lzip_reader = LzipReader::new(member_data.as_slice());
96
97        let mut decompressed_data = Vec::new();
98        let result = match lzip_reader.read_to_end(&mut decompressed_data) {
99            Ok(_) => decompressed_data,
100            Err(error) => {
101                active_workers.fetch_sub(1, Ordering::Release);
102                set_error(error, &error_store, &shutdown_flag);
103                return;
104            }
105        };
106
107        if result_tx.send((index, result)).is_err() {
108            active_workers.fetch_sub(1, Ordering::Release);
109            return;
110        }
111
112        active_workers.fetch_sub(1, Ordering::Release);
113    }
114}
115
116impl<R: Read + Seek> Read for LzipReaderMt<R> {
117    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
118        if buf.is_empty() {
119            return Ok(0);
120        }
121
122        let bytes_read = self.current_chunk.read(buf)?;
123
124        if bytes_read > 0 {
125            return Ok(bytes_read);
126        }
127
128        let chunk_data = self.get_next_uncompressed_chunk()?;
129
130        let Some(chunk_data) = chunk_data else {
131            // This is the clean end of the stream.
132            return Ok(0);
133        };
134
135        self.current_chunk = Cursor::new(chunk_data);
136
137        // Recursive call to read the new chunk data.
138        self.read(buf)
139    }
140}