1use std::fs::File;
2use std::ops::{Deref, Range, RangeBounds};
3use std::path::Path;
4use std::sync::Arc;
5use std::{fmt, io};
6
7use async_trait::async_trait;
8use ownedbytes::{OwnedBytes, StableDeref};
9
10use crate::{ByteCount, HasLen};
11
12#[async_trait]
22pub trait FileHandle: 'static + Send + Sync + HasLen + fmt::Debug {
23 fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes>;
27
28 #[doc(hidden)]
29 async fn read_bytes_async(&self, _byte_range: Range<usize>) -> io::Result<OwnedBytes> {
30 Err(io::Error::new(
31 io::ErrorKind::Unsupported,
32 "Async read is not supported.",
33 ))
34 }
35}
36
37#[derive(Debug)]
38pub struct WrapFile {
40 file: File,
41 len: usize,
42}
43impl WrapFile {
44 pub fn new(file: File) -> io::Result<Self> {
46 let len = file.metadata()?.len() as usize;
47 Ok(WrapFile { file, len })
48 }
49}
50
51#[async_trait]
52impl FileHandle for WrapFile {
53 fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
54 let file_len = self.len();
55
56 let start = range.start;
58 let end = range.end.min(file_len);
59
60 if start >= end {
62 return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid range"));
63 }
64
65 let mut buffer = vec![0; end - start];
66
67 #[cfg(unix)]
68 {
69 use std::os::unix::prelude::FileExt;
70 self.file.read_exact_at(&mut buffer, start as u64)?;
71 }
72
73 #[cfg(not(unix))]
74 {
75 use std::io::{Read, Seek};
76 let mut file = self.file.try_clone()?; file.seek(io::SeekFrom::Start(start as u64))?;
79 file.read_exact(&mut buffer)?;
81 }
82
83 Ok(OwnedBytes::new(buffer))
84 }
85 }
87impl HasLen for WrapFile {
88 fn len(&self) -> usize {
89 self.len
90 }
91}
92
93#[async_trait]
94impl FileHandle for &'static [u8] {
95 fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
96 let bytes = &self[range];
97 Ok(OwnedBytes::new(bytes))
98 }
99
100 async fn read_bytes_async(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
101 Ok(self.read_bytes(byte_range)?)
102 }
103}
104
105impl<B> From<B> for FileSlice
106where B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync
107{
108 fn from(bytes: B) -> FileSlice {
109 FileSlice::new(Arc::new(OwnedBytes::new(bytes)))
110 }
111}
112
113#[derive(Clone)]
117pub struct FileSlice {
118 data: Arc<dyn FileHandle>,
119 range: Range<usize>,
120}
121
122impl fmt::Debug for FileSlice {
123 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124 write!(f, "FileSlice({:?}, {:?})", &self.data, self.range)
125 }
126}
127
128impl FileSlice {
129 pub fn stream_file_chunks(&self) -> impl Iterator<Item = io::Result<OwnedBytes>> + '_ {
130 let len = self.range.end;
131 let mut start = self.range.start;
132 std::iter::from_fn(move || {
133 const CHUNK_SIZE: usize = 1024 * 1024; if start < len {
137 let end = (start + CHUNK_SIZE).min(len);
138 let range = start..end;
139 let chunk = self.data.read_bytes(range);
140 start += CHUNK_SIZE;
141 match chunk {
142 Ok(chunk) => Some(Ok(chunk)),
143 Err(e) => Some(Err(e)),
144 }
145 } else {
146 None
147 }
148 })
149 }
150}
151
152fn combine_ranges<R: RangeBounds<usize>>(orig_range: Range<usize>, rel_range: R) -> Range<usize> {
163 let start: usize = orig_range.start
164 + match rel_range.start_bound().cloned() {
165 std::ops::Bound::Included(rel_start) => rel_start,
166 std::ops::Bound::Excluded(rel_start) => rel_start + 1,
167 std::ops::Bound::Unbounded => 0,
168 };
169 assert!(start <= orig_range.end);
170 let end: usize = match rel_range.end_bound().cloned() {
171 std::ops::Bound::Included(rel_end) => orig_range.start + rel_end + 1,
172 std::ops::Bound::Excluded(rel_end) => orig_range.start + rel_end,
173 std::ops::Bound::Unbounded => orig_range.end,
174 };
175 assert!(end >= start);
176 assert!(end <= orig_range.end);
177 start..end
178}
179
180impl FileSlice {
181 pub fn open(path: &Path) -> io::Result<FileSlice> {
183 let wrap_file = WrapFile::new(File::open(path)?)?;
184 Ok(FileSlice::new(Arc::new(wrap_file)))
185 }
186
187 pub fn new(file_handle: Arc<dyn FileHandle>) -> Self {
189 let num_bytes = file_handle.len();
190 FileSlice::new_with_num_bytes(file_handle, num_bytes)
191 }
192
193 #[doc(hidden)]
195 #[must_use]
196 pub fn new_with_num_bytes(file_handle: Arc<dyn FileHandle>, num_bytes: usize) -> Self {
197 FileSlice {
198 data: file_handle,
199 range: 0..num_bytes,
200 }
201 }
202
203 #[must_use]
209 #[inline]
210 pub fn slice<R: RangeBounds<usize>>(&self, byte_range: R) -> FileSlice {
211 FileSlice {
212 data: self.data.clone(),
213 range: combine_ranges(self.range.clone(), byte_range),
214 }
215 }
216
217 pub fn empty() -> FileSlice {
219 const EMPTY_SLICE: &[u8] = &[];
220 FileSlice::from(EMPTY_SLICE)
221 }
222
223 pub fn read_bytes(&self) -> io::Result<OwnedBytes> {
230 self.data.read_bytes(self.range.clone())
231 }
232
233 #[doc(hidden)]
234 pub async fn read_bytes_async(&self) -> io::Result<OwnedBytes> {
235 self.data.read_bytes_async(self.range.clone()).await
236 }
237
238 pub fn read_bytes_slice(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
242 assert!(
243 range.end <= self.len(),
244 "end of requested range exceeds the fileslice length ({} > {})",
245 range.end,
246 self.len()
247 );
248 self.data
249 .read_bytes(self.range.start + range.start..self.range.start + range.end)
250 }
251
252 #[doc(hidden)]
253 pub async fn read_bytes_slice_async(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
254 assert!(
255 self.range.start + byte_range.end <= self.range.end,
256 "`to` exceeds the fileslice length"
257 );
258 self.data
259 .read_bytes_async(
260 self.range.start + byte_range.start..self.range.start + byte_range.end,
261 )
262 .await
263 }
264
265 pub fn split(self, left_len: usize) -> (FileSlice, FileSlice) {
270 let left = self.slice_to(left_len);
271 let right = self.slice_from(left_len);
272 (left, right)
273 }
274
275 pub fn split_from_end(self, right_len: usize) -> (FileSlice, FileSlice) {
278 let left_len = self.len() - right_len;
279 self.split(left_len)
280 }
281
282 #[must_use]
287 pub fn slice_from(&self, from_offset: usize) -> FileSlice {
288 self.slice(from_offset..self.len())
289 }
290
291 #[must_use]
295 pub fn slice_from_end(&self, from_offset: usize) -> FileSlice {
296 self.slice(self.len() - from_offset..self.len())
297 }
298
299 #[must_use]
304 pub fn slice_to(&self, to_offset: usize) -> FileSlice {
305 self.slice(0..to_offset)
306 }
307
308 pub fn num_bytes(&self) -> ByteCount {
310 self.range.len().into()
311 }
312}
313
314#[async_trait]
315impl FileHandle for FileSlice {
316 fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
317 self.read_bytes_slice(range)
318 }
319
320 async fn read_bytes_async(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
321 self.read_bytes_slice_async(byte_range).await
322 }
323}
324
325impl HasLen for FileSlice {
326 fn len(&self) -> usize {
327 self.range.len()
328 }
329}
330
331#[async_trait]
332impl FileHandle for OwnedBytes {
333 fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
334 Ok(self.slice(range))
335 }
336
337 async fn read_bytes_async(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
338 self.read_bytes(range)
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use std::io;
345 use std::ops::Bound;
346 use std::sync::Arc;
347
348 use super::{FileHandle, FileSlice};
349 use crate::HasLen;
350 use crate::file_slice::combine_ranges;
351
352 #[test]
353 fn test_file_slice() -> io::Result<()> {
354 let file_slice = FileSlice::new(Arc::new(b"abcdef".as_ref()));
355 assert_eq!(file_slice.len(), 6);
356 assert_eq!(file_slice.slice_from(2).read_bytes()?.as_slice(), b"cdef");
357 assert_eq!(file_slice.slice_to(2).read_bytes()?.as_slice(), b"ab");
358 assert_eq!(
359 file_slice
360 .slice_from(1)
361 .slice_to(2)
362 .read_bytes()?
363 .as_slice(),
364 b"bc"
365 );
366 {
367 let (left, right) = file_slice.clone().split(0);
368 assert_eq!(left.read_bytes()?.as_slice(), b"");
369 assert_eq!(right.read_bytes()?.as_slice(), b"abcdef");
370 }
371 {
372 let (left, right) = file_slice.clone().split(2);
373 assert_eq!(left.read_bytes()?.as_slice(), b"ab");
374 assert_eq!(right.read_bytes()?.as_slice(), b"cdef");
375 }
376 {
377 let (left, right) = file_slice.clone().split_from_end(0);
378 assert_eq!(left.read_bytes()?.as_slice(), b"abcdef");
379 assert_eq!(right.read_bytes()?.as_slice(), b"");
380 }
381 {
382 let (left, right) = file_slice.split_from_end(2);
383 assert_eq!(left.read_bytes()?.as_slice(), b"abcd");
384 assert_eq!(right.read_bytes()?.as_slice(), b"ef");
385 }
386 Ok(())
387 }
388
389 #[test]
390 fn test_file_slice_trait_slice_len() {
391 let blop: &'static [u8] = b"abc";
392 let owned_bytes: Box<dyn FileHandle> = Box::new(blop);
393 assert_eq!(owned_bytes.len(), 3);
394 }
395
396 #[test]
397 fn test_slice_simple_read() -> io::Result<()> {
398 let slice = FileSlice::new(Arc::new(&b"abcdef"[..]));
399 assert_eq!(slice.len(), 6);
400 assert_eq!(slice.read_bytes()?.as_ref(), b"abcdef");
401 assert_eq!(slice.slice(1..4).read_bytes()?.as_ref(), b"bcd");
402 Ok(())
403 }
404
405 #[test]
406 fn test_slice_read_slice() -> io::Result<()> {
407 let slice_deref = FileSlice::new(Arc::new(&b"abcdef"[..]));
408 assert_eq!(slice_deref.read_bytes_slice(1..4)?.as_ref(), b"bcd");
409 Ok(())
410 }
411
412 #[test]
413 #[should_panic(expected = "end of requested range exceeds the fileslice length (10 > 6)")]
414 fn test_slice_read_slice_invalid_range_exceeds() {
415 let slice_deref = FileSlice::new(Arc::new(&b"abcdef"[..]));
416 assert_eq!(
417 slice_deref.read_bytes_slice(0..10).unwrap().as_ref(),
418 b"bcd"
419 );
420 }
421
422 #[test]
423 fn test_combine_range() {
424 assert_eq!(combine_ranges(1..3, 0..1), 1..2);
425 assert_eq!(combine_ranges(1..3, 1..), 2..3);
426 assert_eq!(combine_ranges(1..4, ..2), 1..3);
427 assert_eq!(combine_ranges(3..10, 2..5), 5..8);
428 assert_eq!(combine_ranges(2..11, 5..=7), 7..10);
429 assert_eq!(
430 combine_ranges(2..11, (Bound::Excluded(5), Bound::Unbounded)),
431 8..11
432 );
433 }
434
435 #[test]
436 #[should_panic]
437 fn test_combine_range_panics() {
438 let _ = combine_ranges(3..5, 1..4);
439 }
440}