Skip to main content

vortex_io/std_file/
read_at.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fs::File;
5use std::io;
6#[cfg(all(not(unix), not(windows)))]
7use std::io::Read;
8#[cfg(all(not(unix), not(windows)))]
9use std::io::Seek;
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12#[cfg(windows)]
13use std::os::windows::fs::FileExt;
14use std::path::Path;
15use std::sync::Arc;
16
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use vortex_array::buffer::BufferHandle;
20use vortex_buffer::Alignment;
21use vortex_buffer::ByteBufferMut;
22use vortex_error::VortexResult;
23
24use crate::CoalesceConfig;
25use crate::VortexReadAt;
26use crate::runtime::Handle;
27
28/// Read exactly `buffer.len()` bytes from `file` starting at `offset`.
29/// This is a platform-specific helper that uses the most efficient method available.
30#[cfg(not(target_arch = "wasm32"))]
31pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> io::Result<()> {
32    #[cfg(unix)]
33    {
34        file.read_exact_at(buffer, offset)
35    }
36    #[cfg(windows)]
37    {
38        let mut bytes_read = 0;
39        while bytes_read < buffer.len() {
40            let read = file.seek_read(&mut buffer[bytes_read..], offset + bytes_read as u64)?;
41            if read == 0 {
42                return Err(io::Error::new(
43                    io::ErrorKind::UnexpectedEof,
44                    "failed to fill whole buffer",
45                ));
46            }
47            bytes_read += read;
48        }
49        Ok(())
50    }
51    #[cfg(all(not(unix), not(windows)))]
52    {
53        use std::io::SeekFrom;
54        let mut file_ref = file;
55        file_ref.seek(SeekFrom::Start(offset))?;
56        file_ref.read_exact(buffer)
57    }
58}
59
60const COALESCING_CONFIG: CoalesceConfig = CoalesceConfig {
61    // TODO(ngates): these numbers don't make sense if we're using spawn_blocking..
62    distance: 8 * 1024, // KB
63    max_size: 8 * 1024, // KB
64};
65/// Default number of concurrent requests to allow for local file I/O.
66pub const DEFAULT_CONCURRENCY: usize = 32;
67
68/// An adapter type wrapping a [`File`] to implement [`VortexReadAt`].
69pub struct FileReadAt {
70    uri: Arc<str>,
71    file: Arc<File>,
72    handle: Handle,
73}
74
75impl FileReadAt {
76    /// Open a file for reading.
77    pub fn open(path: impl AsRef<Path>, handle: Handle) -> VortexResult<Self> {
78        let path = path.as_ref();
79        let uri = path.to_string_lossy().to_string().into();
80        let file = Arc::new(File::open(path)?);
81        Ok(Self { uri, file, handle })
82    }
83}
84
85impl VortexReadAt for FileReadAt {
86    fn uri(&self) -> Option<&Arc<str>> {
87        Some(&self.uri)
88    }
89
90    fn coalesce_config(&self) -> Option<CoalesceConfig> {
91        Some(COALESCING_CONFIG)
92    }
93
94    fn concurrency(&self) -> usize {
95        DEFAULT_CONCURRENCY
96    }
97
98    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
99        let file = self.file.clone();
100        async move {
101            let metadata = file.metadata()?;
102            Ok(metadata.len())
103        }
104        .boxed()
105    }
106
107    fn read_at(
108        &self,
109        offset: u64,
110        length: usize,
111        alignment: Alignment,
112    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
113        let file = self.file.clone();
114        let handle = self.handle.clone();
115        async move {
116            handle
117                .spawn_blocking(move || {
118                    let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment);
119                    unsafe { buffer.set_len(length) };
120                    read_exact_at(&file, &mut buffer, offset)?;
121                    Ok(BufferHandle::new_host(buffer.freeze()))
122                })
123                .await
124        }
125        .boxed()
126    }
127}