1use std::ops::DerefMut;
3
4use async_compression::tokio::bufread::{XzDecoder, ZlibDecoder, ZstdDecoder};
5use fuser_async::cache::DataBlockCache;
6use fuser_async::utils::OutOf;
7use serde::Deserialize;
8use tokio::io::{AsyncBufRead, AsyncReadExt, AsyncSeekExt, AsyncWrite};
9use tracing::*;
10
11use super::error::DecompressError;
12use super::superblock::Compression;
13use super::Error;
14use super::SquashFs;
15use crate::pools;
16
17pub async fn decompress(
18 mut input: impl AsyncBufRead + Unpin + Send + Sync,
19 compressed_size: u64,
20 mut output: impl AsyncWrite + Unpin,
21 compression: Option<Compression>,
22) -> Result<(), DecompressError> {
23 let mut input = (&mut input).take(compressed_size);
24
25 let mut input: Box<dyn crate::AsyncRead> = match compression {
26 None => Box::new(input),
27 Some(Compression::Zstd) => Box::new(ZstdDecoder::new(&mut input)),
28 Some(Compression::Gzip) => Box::new(ZlibDecoder::new(&mut input)),
29 Some(Compression::Xz) => Box::new(XzDecoder::new(&mut input)),
30 Some(compression) => return Err(DecompressError::UnsupportedCompression(compression)),
32 };
33 tokio::io::copy(&mut input, &mut output).await?;
34
35 Ok(())
36}
37
38#[derive(Debug, Copy, Clone, Deserialize)]
39pub struct BlockSize(pub u32);
40impl BlockSize {
41 pub fn compressed(&self) -> bool {
42 (self.0 & (1 << 24)) == 0
43 }
44 pub fn compressed_size(&self) -> u64 {
45 (self.0 & 0x00FFFFFF) as u64
46 }
47}
48
49#[derive(Debug)]
50pub struct DataLocation {
51 pub block_start: u64,
52 pub block_size: BlockSize,
53}
54
55impl<
56 T: crate::AsyncSeekBufRead,
57 R: deadpool::managed::Manager<Type = T, Error = tokio::io::Error> + Send + Sync,
58 > SquashFs<R>
59{
60 pub async fn read_file(
62 &self,
63 inode: u32,
64 offset: usize,
65 size: usize,
66 mut flags: pools::ReadFlags,
67 compression: Compression,
68 ) -> Result<bytes::Bytes, Error> {
69 let file = self
70 .inode_table
71 .files
72 .get(&inode)
73 .ok_or(Error::FileNotFound(None))?;
74 let size = size.min(
75 (file.file_size() as usize)
76 .checked_sub(offset)
77 .ok_or(Error::InvalidOffset)?,
78 );
79 if size == 0 {
80 return Ok(bytes::Bytes::default());
81 }
82 debug!(
83 inode,
84 offset,
85 size,
86 portion = OutOf::new(size, file.file_size()).display_full(),
87 "Reading squashfs file",
88 );
89 if (file.file_size() as usize) < self.superblock.block_size as usize {
91 warn!(inode, "Accessing very small file (< block) in direct mode");
93 flags |= libc::O_DIRECT;
94 } else if let (true, Some(cache)) = (
95 (file.file_size() as usize) < self.direct_limit
96 && !file.fragment().valid() && (flags & libc::O_DIRECT) != 0,
98 &self.small_files_cache,
99 ) {
100 let first = file.data_locations().next().unwrap();
104 let tot_size = file
105 .data_locations()
106 .map(|dl| dl.block_size.compressed_size())
107 .sum::<u64>();
108 flags |= libc::O_DIRECT;
109 let cached = cache
111 .insert_lock(inode as u64, async {
112 warn!(
113 inode,
114 "Accessing small file (< direct limit) in direct mode"
115 );
116 let mut reader = self.get_reader(flags).await?;
117 reader
119 .seek(std::io::SeekFrom::Start(first.block_start))
120 .await
121 .map_err(Error::ReadFailure)?;
122 let mut buf = bytes::BytesMut::zeroed(tot_size as usize);
123 reader
124 .read_exact(&mut buf)
125 .await
126 .map_err(Error::ReadFailure)?;
127 let mut cursor = std::io::Cursor::new(buf.deref_mut());
128 self.read_file_impl(
130 file,
131 (&mut cursor, first.block_start),
132 inode,
133 (0, file.file_size() as usize),
135 compression,
136 )
137 .await
138 .map_err(Box::new)
139 })
140 .await?;
141 let mut buf = bytes::BytesMut::zeroed(size);
142 buf.copy_from_slice(&cached.data[offset..offset + size]);
143 return Ok(buf.into());
144 }
145 let mut reader = self.get_reader(flags).await?;
146 self.read_file_impl(
147 file,
148 (reader.deref_mut(), 0),
149 inode,
150 (offset, size),
151 compression,
152 )
153 .await
154 }
155 #[allow(clippy::borrowed_box)]
156 pub async fn read_file_impl(
157 &self,
158 file: &Box<dyn crate::inodes::FileInode + Send + Sync>,
159 (mut reader, reader_offset): (impl crate::AsyncSeekBufRead, u64),
160 inode: u32,
161 (offset, size): (usize, usize),
162 compression: Compression,
163 ) -> Result<bytes::Bytes, Error> {
164 let start = std::time::Instant::now();
165
166 let superblock = &self.superblock;
167 let first_block = (offset as f64 / superblock.block_size as f64).floor() as usize;
168 let block_offset = offset % self.superblock.block_size as usize;
169 let n_blocks =
170 ((block_offset + size) as f64 / self.superblock.block_size as f64).ceil() as usize;
171 let mut buf = bytes::BytesMut::zeroed(superblock.block_size as usize * n_blocks);
172 let mut buf_parts: Vec<_> = (0..n_blocks)
173 .map(|_| buf.split_off(buf.len() - superblock.block_size as usize))
174 .rev()
175 .collect();
176
177 let data_locations: Vec<_> = file
178 .data_locations()
179 .skip(first_block)
180 .take(n_blocks)
181 .collect();
182 debug!(
183 inode,
184 offset,
185 size,
186 "{} data blocks to read, {} available from regular blocks, first block {}",
187 n_blocks,
188 data_locations.len(),
189 first_block
190 );
191 for (l, buf_part) in data_locations.iter().zip(buf_parts.iter_mut()) {
193 read_data_block(
194 &mut reader,
195 reader_offset,
196 l.block_start,
197 l.block_size,
198 buf_part.as_mut(),
199 self.cache.as_ref(),
200 compression,
201 )
202 .await?;
203 }
204 if data_locations.len() != n_blocks {
206 debug!("Reading from fragment");
207 assert!(n_blocks == data_locations.len() + 1);
208 let buf = buf_parts.last_mut().unwrap();
209 let fragment_location = file.fragment();
210 let entry = self.fragments_table.entry(fragment_location)?;
211
212 read_data_block(
213 reader,
214 reader_offset,
215 entry.start,
216 entry.size,
217 buf,
218 self.cache.as_ref(),
219 compression,
220 )
221 .await?;
222 let _ = buf.split_to(fragment_location.offset as usize);
223 }
224 for part in buf_parts {
225 buf.unsplit(part);
226 }
227 let _ = buf.split_to(block_offset);
228 let _ = buf.split_off(size);
229 let buf = buf.freeze();
230
231 if buf.len() != size {
232 return Err(Error::InvalidBufferSize);
233 }
234 debug!(
235 inode,
236 offset,
237 size,
238 speed_mb_s = buf.len() as f64 / 1e6 / start.elapsed().as_secs_f64(),
239 "Finished reading",
240 );
241 Ok(buf)
242 }
243}
244pub async fn read_data_block(
245 mut r: impl crate::AsyncSeekBufRead,
246 reader_offset: u64,
247 start: u64,
248 b: BlockSize,
249 buf: &mut [u8],
250 cache: Option<&impl DataBlockCache<Box<Error>>>,
251 compression: Compression,
252) -> Result<(), Error> {
253 r.seek(std::io::SeekFrom::Start(start - reader_offset))
254 .await
255 .map_err(Error::ReadFailure)?;
256 debug!(
257 compression_ratio = b.compressed_size() as f32 / buf.len() as f32,
258 compressed_size = b.compressed_size(),
259 start,
260 cache = format!("{}", cache.map(|c| c.to_string()).unwrap_or_default()),
261 "Reading data block",
262 );
263
264 if b.compressed_size() == 0 {
266 return Ok(());
267 }
268 if let Some(cache) = cache {
270 if let Some(block) = cache.get(start).await {
271 if block.data.len() != buf.len() {
272 return Err(Error::InvalidBufferSize);
273 }
274 buf.copy_from_slice(&block.data);
275 return Ok(());
276 }
277 }
278 let mut cursor = std::io::Cursor::new(buf);
281
282 decompress(
283 &mut r,
284 b.compressed_size(),
285 &mut cursor,
286 b.compressed().then_some(compression),
287 )
288 .await?;
289 if let Some(cache) = cache {
291 let _ = cache
292 .insert_lock(start, async {
293 let data = cursor.into_inner();
294 Ok(&*data)
295 })
296 .await?;
297 }
298 Ok(())
299}