lzma_rust2/lzip/
reader_mt.rs

1use std::{
2    io::{self, Cursor, Seek, SeekFrom},
3    sync::{
4        atomic::{AtomicBool, AtomicU32, Ordering},
5        mpsc::SyncSender,
6        Arc, Mutex,
7    },
8};
9
10use super::{scan_members, LzipMember};
11use crate::{
12    set_error,
13    work_pool::{WorkPool, WorkPoolConfig, WorkPoolState},
14    work_queue::WorkerHandle,
15    LzipReader, Read,
16};
17
18/// A work unit for a worker thread.
19#[derive(Debug)]
20struct WorkUnit {
21    member_data: Vec<u8>,
22}
23
24/// A multi-threaded LZIP decompressor.
25pub struct LzipReaderMt<R: Read + Seek> {
26    inner: R,
27    members: Vec<LzipMember>,
28    work_pool: WorkPool<WorkUnit, Vec<u8>>,
29    current_chunk: Cursor<Vec<u8>>,
30}
31
32impl<R: Read + Seek> LzipReaderMt<R> {
33    /// Creates a new multi-threaded LZIP reader.
34    ///
35    /// - `inner`: The reader to read compressed data from. Must implement Seek.
36    /// - `num_workers`: The maximum number of worker threads for decompression. Currently capped at 256 threads.
37    pub fn new(inner: R, num_workers: u32) -> io::Result<Self> {
38        let (inner, members) = scan_members(inner)?;
39        let num_members = members.len() as u64;
40
41        Ok(Self {
42            inner,
43            members,
44            work_pool: WorkPool::new(
45                WorkPoolConfig::new(num_workers, num_members),
46                worker_thread_logic,
47            ),
48            current_chunk: Cursor::new(Vec::new()),
49        })
50    }
51
52    /// Get the count of LZIP members found in the file.
53    pub fn member_count(&self) -> usize {
54        self.members.len()
55    }
56
57    fn get_next_uncompressed_chunk(&mut self) -> io::Result<Option<Vec<u8>>> {
58        // Check if we've processed all members
59        if matches!(self.work_pool.state(), WorkPoolState::Finished) {
60            return Ok(None);
61        }
62
63        self.work_pool.get_result(|index| {
64            let member = &self.members[index as usize];
65            self.inner.seek(SeekFrom::Start(member.start_pos)).unwrap();
66            let mut member_data = vec![0u8; member.compressed_size as usize];
67            self.inner.read_exact(&mut member_data).unwrap();
68            Ok(WorkUnit { member_data })
69        })
70    }
71}
72
73/// The logic for a single worker thread.
74fn worker_thread_logic(
75    worker_handle: WorkerHandle<(u64, WorkUnit)>,
76    result_tx: SyncSender<(u64, Vec<u8>)>,
77    shutdown_flag: Arc<AtomicBool>,
78    error_store: Arc<Mutex<Option<io::Error>>>,
79    active_workers: Arc<AtomicU32>,
80) {
81    while !shutdown_flag.load(Ordering::Acquire) {
82        let work_unit = match worker_handle.steal() {
83            Some(work) => {
84                active_workers.fetch_add(1, Ordering::Release);
85                work
86            }
87            None => {
88                // No more work available and queue is closed.
89                break;
90            }
91        };
92
93        let (index, WorkUnit { member_data }) = work_unit;
94
95        let reader_result = LzipReader::new(member_data.as_slice());
96
97        let mut lzip_reader = match reader_result {
98            Ok(reader) => reader,
99            Err(error) => {
100                active_workers.fetch_sub(1, Ordering::Release);
101                set_error(error, &error_store, &shutdown_flag);
102                return;
103            }
104        };
105
106        let mut decompressed_data = Vec::new();
107        let result = match lzip_reader.read_to_end(&mut decompressed_data) {
108            Ok(_) => decompressed_data,
109            Err(error) => {
110                active_workers.fetch_sub(1, Ordering::Release);
111                set_error(error, &error_store, &shutdown_flag);
112                return;
113            }
114        };
115
116        if result_tx.send((index, result)).is_err() {
117            active_workers.fetch_sub(1, Ordering::Release);
118            return;
119        }
120
121        active_workers.fetch_sub(1, Ordering::Release);
122    }
123}
124
125impl<R: Read + Seek> Read for LzipReaderMt<R> {
126    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
127        if buf.is_empty() {
128            return Ok(0);
129        }
130
131        let bytes_read = self.current_chunk.read(buf)?;
132
133        if bytes_read > 0 {
134            return Ok(bytes_read);
135        }
136
137        let chunk_data = self.get_next_uncompressed_chunk()?;
138
139        let Some(chunk_data) = chunk_data else {
140            // This is the clean end of the stream.
141            return Ok(0);
142        };
143
144        self.current_chunk = Cursor::new(chunk_data);
145
146        // Recursive call to read the new chunk data.
147        self.read(buf)
148    }
149}