squashfs_async/
data.rs

1//! Reading data blocks
2use std::ops::DerefMut;
3
4use async_compression::tokio::bufread::{XzDecoder, ZlibDecoder, ZstdDecoder};
5use fuser_async::cache::DataBlockCache;
6use fuser_async::utils::OutOf;
7use serde::Deserialize;
8use tokio::io::{AsyncBufRead, AsyncReadExt, AsyncSeekExt, AsyncWrite};
9use tracing::*;
10
11use super::error::DecompressError;
12use super::superblock::Compression;
13use super::Error;
14use super::SquashFs;
15use crate::pools;
16
17pub async fn decompress(
18    mut input: impl AsyncBufRead + Unpin + Send + Sync,
19    compressed_size: u64,
20    mut output: impl AsyncWrite + Unpin,
21    compression: Option<Compression>,
22) -> Result<(), DecompressError> {
23    let mut input = (&mut input).take(compressed_size);
24
25    let mut input: Box<dyn crate::AsyncRead> = match compression {
26        None => Box::new(input),
27        Some(Compression::Zstd) => Box::new(ZstdDecoder::new(&mut input)),
28        Some(Compression::Gzip) => Box::new(ZlibDecoder::new(&mut input)),
29        Some(Compression::Xz) => Box::new(XzDecoder::new(&mut input)),
30        // TODO: Other schemes
31        Some(compression) => return Err(DecompressError::UnsupportedCompression(compression)),
32    };
33    tokio::io::copy(&mut input, &mut output).await?;
34
35    Ok(())
36}
37
38#[derive(Debug, Copy, Clone, Deserialize)]
39pub struct BlockSize(pub u32);
40impl BlockSize {
41    pub fn compressed(&self) -> bool {
42        (self.0 & (1 << 24)) == 0
43    }
44    pub fn compressed_size(&self) -> u64 {
45        (self.0 & 0x00FFFFFF) as u64
46    }
47}
48
49#[derive(Debug)]
50pub struct DataLocation {
51    pub block_start: u64,
52    pub block_size: BlockSize,
53}
54
55impl<
56        T: crate::AsyncSeekBufRead,
57        R: deadpool::managed::Manager<Type = T, Error = tokio::io::Error> + Send + Sync,
58    > SquashFs<R>
59{
60    /// Read from a file from the archive
61    pub async fn read_file(
62        &self,
63        inode: u32,
64        offset: usize,
65        size: usize,
66        mut flags: pools::ReadFlags,
67        compression: Compression,
68    ) -> Result<bytes::Bytes, Error> {
69        let file = self
70            .inode_table
71            .files
72            .get(&inode)
73            .ok_or(Error::FileNotFound(None))?;
74        let size = size.min(
75            (file.file_size() as usize)
76                .checked_sub(offset)
77                .ok_or(Error::InvalidOffset)?,
78        );
79        if size == 0 {
80            return Ok(bytes::Bytes::default());
81        }
82        debug!(
83            inode,
84            offset,
85            size,
86            portion = OutOf::new(size, file.file_size()).display_full(),
87            "Reading squashfs file",
88        );
89        // Optimization for small files
90        if (file.file_size() as usize) < self.superblock.block_size as usize {
91            // Treating these separately also allows not having to worry about fragments below.
92            warn!(inode, "Accessing very small file (< block) in direct mode");
93            flags |= libc::O_DIRECT;
94        } else if let (true, Some(cache)) = (
95            (file.file_size() as usize) < self.direct_limit
96                // Skip when tailend fragments (which would require another fetch)
97                && !file.fragment().valid() && (flags & libc::O_DIRECT) != 0,
98            &self.small_files_cache,
99        ) {
100            // We read the entire underlying data at once and then decode it.
101            // This large read, along with the O_DIRECT flag, provides a hint to the backend
102            // to skip buffering and read exactly this data.
103            let first = file.data_locations().next().unwrap();
104            let tot_size = file
105                .data_locations()
106                .map(|dl| dl.block_size.compressed_size())
107                .sum::<u64>();
108            flags |= libc::O_DIRECT;
109            // Cache the entire decompressed file
110            let cached = cache
111                .insert_lock(inode as u64, async {
112                    warn!(
113                        inode,
114                        "Accessing small file (< direct limit) in direct mode"
115                    );
116                    let mut reader = self.get_reader(flags).await?;
117                    // Read the raw contents
118                    reader
119                        .seek(std::io::SeekFrom::Start(first.block_start))
120                        .await
121                        .map_err(Error::ReadFailure)?;
122                    let mut buf = bytes::BytesMut::zeroed(tot_size as usize);
123                    reader
124                        .read_exact(&mut buf)
125                        .await
126                        .map_err(Error::ReadFailure)?;
127                    let mut cursor = std::io::Cursor::new(buf.deref_mut());
128                    // Decode the contents
129                    self.read_file_impl(
130                        file,
131                        (&mut cursor, first.block_start),
132                        inode,
133                        // Use the decompressed size here
134                        (0, file.file_size() as usize),
135                        compression,
136                    )
137                    .await
138                    .map_err(Box::new)
139                })
140                .await?;
141            let mut buf = bytes::BytesMut::zeroed(size);
142            buf.copy_from_slice(&cached.data[offset..offset + size]);
143            return Ok(buf.into());
144        }
145        let mut reader = self.get_reader(flags).await?;
146        self.read_file_impl(
147            file,
148            (reader.deref_mut(), 0),
149            inode,
150            (offset, size),
151            compression,
152        )
153        .await
154    }
155    #[allow(clippy::borrowed_box)]
156    pub async fn read_file_impl(
157        &self,
158        file: &Box<dyn crate::inodes::FileInode + Send + Sync>,
159        (mut reader, reader_offset): (impl crate::AsyncSeekBufRead, u64),
160        inode: u32,
161        (offset, size): (usize, usize),
162        compression: Compression,
163    ) -> Result<bytes::Bytes, Error> {
164        let start = std::time::Instant::now();
165
166        let superblock = &self.superblock;
167        let first_block = (offset as f64 / superblock.block_size as f64).floor() as usize;
168        let block_offset = offset % self.superblock.block_size as usize;
169        let n_blocks =
170            ((block_offset + size) as f64 / self.superblock.block_size as f64).ceil() as usize;
171        let mut buf = bytes::BytesMut::zeroed(superblock.block_size as usize * n_blocks);
172        let mut buf_parts: Vec<_> = (0..n_blocks)
173            .map(|_| buf.split_off(buf.len() - superblock.block_size as usize))
174            .rev()
175            .collect();
176
177        let data_locations: Vec<_> = file
178            .data_locations()
179            .skip(first_block)
180            .take(n_blocks)
181            .collect();
182        debug!(
183            inode,
184            offset,
185            size,
186            "{} data blocks to read, {} available from regular blocks, first block {}",
187            n_blocks,
188            data_locations.len(),
189            first_block
190        );
191        // Read from regular data blocks
192        for (l, buf_part) in data_locations.iter().zip(buf_parts.iter_mut()) {
193            read_data_block(
194                &mut reader,
195                reader_offset,
196                l.block_start,
197                l.block_size,
198                buf_part.as_mut(),
199                self.cache.as_ref(),
200                compression,
201            )
202            .await?;
203        }
204        // Read last part from fragment if necessary
205        if data_locations.len() != n_blocks {
206            debug!("Reading from fragment");
207            assert!(n_blocks == data_locations.len() + 1);
208            let buf = buf_parts.last_mut().unwrap();
209            let fragment_location = file.fragment();
210            let entry = self.fragments_table.entry(fragment_location)?;
211
212            read_data_block(
213                reader,
214                reader_offset,
215                entry.start,
216                entry.size,
217                buf,
218                self.cache.as_ref(),
219                compression,
220            )
221            .await?;
222            let _ = buf.split_to(fragment_location.offset as usize);
223        }
224        for part in buf_parts {
225            buf.unsplit(part);
226        }
227        let _ = buf.split_to(block_offset);
228        let _ = buf.split_off(size);
229        let buf = buf.freeze();
230
231        if buf.len() != size {
232            return Err(Error::InvalidBufferSize);
233        }
234        debug!(
235            inode,
236            offset,
237            size,
238            speed_mb_s = buf.len() as f64 / 1e6 / start.elapsed().as_secs_f64(),
239            "Finished reading",
240        );
241        Ok(buf)
242    }
243}
244pub async fn read_data_block(
245    mut r: impl crate::AsyncSeekBufRead,
246    reader_offset: u64,
247    start: u64,
248    b: BlockSize,
249    buf: &mut [u8],
250    cache: Option<&impl DataBlockCache<Box<Error>>>,
251    compression: Compression,
252) -> Result<(), Error> {
253    r.seek(std::io::SeekFrom::Start(start - reader_offset))
254        .await
255        .map_err(Error::ReadFailure)?;
256    debug!(
257        compression_ratio = b.compressed_size() as f32 / buf.len() as f32,
258        compressed_size = b.compressed_size(),
259        start,
260        cache = format!("{}", cache.map(|c| c.to_string()).unwrap_or_default()),
261        "Reading data block",
262    );
263
264    // Crucial to not mess up the caching
265    if b.compressed_size() == 0 {
266        return Ok(());
267    }
268    // Check cache
269    if let Some(cache) = cache {
270        if let Some(block) = cache.get(start).await {
271            if block.data.len() != buf.len() {
272                return Err(Error::InvalidBufferSize);
273            }
274            buf.copy_from_slice(&block.data);
275            return Ok(());
276        }
277    }
278    // Given we're reading directly into the buffer, we're not doing that in the lock insert.
279    // (but we might be missing some cache hits doing so)
280    let mut cursor = std::io::Cursor::new(buf);
281
282    decompress(
283        &mut r,
284        b.compressed_size(),
285        &mut cursor,
286        b.compressed().then_some(compression),
287    )
288    .await?;
289    // Write cache
290    if let Some(cache) = cache {
291        let _ = cache
292            .insert_lock(start, async {
293                let data = cursor.into_inner();
294                Ok(&*data)
295            })
296            .await?;
297    }
298    Ok(())
299}