Skip to main content

vortex_io/std_file/
read_at.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fs::File;
5use std::io;
6#[cfg(all(not(unix), not(windows)))]
7use std::io::Read;
8#[cfg(all(not(unix), not(windows)))]
9use std::io::Seek;
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12#[cfg(windows)]
13use std::os::windows::fs::FileExt;
14use std::path::Path;
15use std::sync::Arc;
16
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use vortex_array::buffer::BufferHandle;
20use vortex_buffer::Alignment;
21use vortex_buffer::ByteBufferMut;
22use vortex_error::VortexResult;
23
24use crate::CoalesceConfig;
25use crate::VortexReadAt;
26use crate::runtime::Handle;
27
28/// Read exactly `buffer.len()` bytes from `file` starting at `offset`.
29/// This is a platform-specific helper that uses the most efficient method available.
30#[cfg(not(target_arch = "wasm32"))]
31pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> io::Result<()> {
32    #[cfg(unix)]
33    {
34        file.read_exact_at(buffer, offset)
35    }
36    #[cfg(windows)]
37    {
38        let mut bytes_read = 0;
39        while bytes_read < buffer.len() {
40            let read = file.seek_read(&mut buffer[bytes_read..], offset + bytes_read as u64)?;
41            if read == 0 {
42                return Err(io::Error::new(
43                    io::ErrorKind::UnexpectedEof,
44                    "failed to fill whole buffer",
45                ));
46            }
47            bytes_read += read;
48        }
49        Ok(())
50    }
51    #[cfg(all(not(unix), not(windows)))]
52    {
53        use std::io::SeekFrom;
54        let mut file_ref = file;
55        file_ref.seek(SeekFrom::Start(offset))?;
56        file_ref.read_exact(buffer)
57    }
58}
59
60const COALESCING_CONFIG: CoalesceConfig = CoalesceConfig {
61    distance: 1024 * 1024,     // 1MB
62    max_size: 4 * 1024 * 1024, // 4MB
63};
64/// Default number of concurrent requests to allow for local file I/O.
65pub const DEFAULT_CONCURRENCY: usize = 32;
66
67/// An adapter type wrapping a [`File`] to implement [`VortexReadAt`].
68pub struct FileReadAt {
69    uri: Arc<str>,
70    file: Arc<File>,
71    handle: Handle,
72}
73
74impl FileReadAt {
75    /// Open a file for reading.
76    pub fn open(path: impl AsRef<Path>, handle: Handle) -> VortexResult<Self> {
77        let path = path.as_ref();
78        let uri = path.to_string_lossy().to_string().into();
79        let file = Arc::new(File::open(path)?);
80        Ok(Self { uri, file, handle })
81    }
82}
83
84impl VortexReadAt for FileReadAt {
85    fn uri(&self) -> Option<&Arc<str>> {
86        Some(&self.uri)
87    }
88
89    fn coalesce_config(&self) -> Option<CoalesceConfig> {
90        Some(COALESCING_CONFIG)
91    }
92
93    fn concurrency(&self) -> usize {
94        DEFAULT_CONCURRENCY
95    }
96
97    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
98        let file = self.file.clone();
99        async move {
100            let metadata = file.metadata()?;
101            Ok(metadata.len())
102        }
103        .boxed()
104    }
105
106    fn read_at(
107        &self,
108        offset: u64,
109        length: usize,
110        alignment: Alignment,
111    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
112        let file = self.file.clone();
113        let handle = self.handle.clone();
114        async move {
115            handle
116                .spawn_blocking(move || {
117                    let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment);
118                    unsafe { buffer.set_len(length) };
119                    read_exact_at(&file, &mut buffer, offset)?;
120                    Ok(BufferHandle::new_host(buffer.freeze()))
121                })
122                .await
123        }
124        .boxed()
125    }
126}