Skip to main content

vortex_io/std_file/
read_at.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fs::File;
5use std::io;
6#[cfg(all(not(unix), not(windows)))]
7use std::io::Read;
8#[cfg(all(not(unix), not(windows)))]
9use std::io::Seek;
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12#[cfg(windows)]
13use std::os::windows::fs::FileExt;
14use std::path::Path;
15use std::sync::Arc;
16
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use vortex_array::buffer::BufferHandle;
20use vortex_array::memory::DefaultHostAllocator;
21use vortex_array::memory::HostAllocatorRef;
22use vortex_buffer::Alignment;
23use vortex_error::VortexResult;
24
25use crate::CoalesceConfig;
26use crate::VortexReadAt;
27use crate::runtime::Handle;
28
29/// Read exactly `buffer.len()` bytes from `file` starting at `offset`.
30/// This is a platform-specific helper that uses the most efficient method available.
31#[cfg(not(target_arch = "wasm32"))]
32pub fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> io::Result<()> {
33    #[cfg(unix)]
34    {
35        file.read_exact_at(buffer, offset)
36    }
37    #[cfg(windows)]
38    {
39        let mut bytes_read = 0;
40        while bytes_read < buffer.len() {
41            let read = file.seek_read(&mut buffer[bytes_read..], offset + bytes_read as u64)?;
42            if read == 0 {
43                return Err(io::Error::new(
44                    io::ErrorKind::UnexpectedEof,
45                    "failed to fill whole buffer",
46                ));
47            }
48            bytes_read += read;
49        }
50        Ok(())
51    }
52    #[cfg(all(not(unix), not(windows)))]
53    {
54        use std::io::SeekFrom;
55        let mut file_ref = file;
56        file_ref.seek(SeekFrom::Start(offset))?;
57        file_ref.read_exact(buffer)
58    }
59}
60
61/// Default number of concurrent requests to allow for local file I/O.
62pub const DEFAULT_CONCURRENCY: usize = 32;
63
64/// An adapter type wrapping a [`File`] to implement [`VortexReadAt`].
65pub struct FileReadAt {
66    uri: Arc<str>,
67    file: Arc<File>,
68    handle: Handle,
69    allocator: HostAllocatorRef,
70}
71
72impl FileReadAt {
73    /// Open a file for reading.
74    pub fn open(path: impl AsRef<Path>, handle: Handle) -> VortexResult<Self> {
75        Self::open_with_allocator(path, handle, Arc::new(DefaultHostAllocator))
76    }
77
78    /// Open a file for reading using a custom writable buffer allocator.
79    pub fn open_with_allocator(
80        path: impl AsRef<Path>,
81        handle: Handle,
82        allocator: HostAllocatorRef,
83    ) -> VortexResult<Self> {
84        let path = path.as_ref();
85        let uri = path.to_string_lossy().to_string().into();
86        let file = Arc::new(File::open(path)?);
87        Ok(Self {
88            uri,
89            file,
90            handle,
91            allocator,
92        })
93    }
94}
95
96impl VortexReadAt for FileReadAt {
97    fn uri(&self) -> Option<&Arc<str>> {
98        Some(&self.uri)
99    }
100
101    fn coalesce_config(&self) -> Option<CoalesceConfig> {
102        Some(CoalesceConfig::file())
103    }
104
105    fn concurrency(&self) -> usize {
106        DEFAULT_CONCURRENCY
107    }
108
109    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
110        let file = Arc::clone(&self.file);
111        async move {
112            let metadata = file.metadata()?;
113            Ok(metadata.len())
114        }
115        .boxed()
116    }
117
118    fn read_at(
119        &self,
120        offset: u64,
121        length: usize,
122        alignment: Alignment,
123    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
124        let file = Arc::clone(&self.file);
125        let handle = self.handle.clone();
126        let allocator = Arc::clone(&self.allocator);
127        async move {
128            handle
129                .spawn_blocking(move || {
130                    let mut buffer = allocator.allocate(length, alignment)?;
131                    read_exact_at(&file, buffer.as_mut_slice(), offset)?;
132                    Ok(BufferHandle::new_host(buffer.freeze()))
133                })
134                .await
135        }
136        .boxed()
137    }
138}