vortex_io/file/
std_file.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fs::File;
5#[cfg(not(unix))]
6use std::io::Read;
7#[cfg(not(unix))]
8use std::io::Seek;
9#[cfg(unix)]
10use std::os::unix::fs::FileExt;
11#[cfg(windows)]
12use std::os::windows::fs::FileExt;
13use std::path::Path;
14use std::path::PathBuf;
15use std::sync::Arc;
16
17use futures::FutureExt;
18use futures::StreamExt;
19use futures::future::BoxFuture;
20use futures::stream::BoxStream;
21use vortex_buffer::ByteBufferMut;
22use vortex_error::VortexError;
23use vortex_error::VortexResult;
24
25use crate::file::CoalesceWindow;
26use crate::file::IntoReadSource;
27use crate::file::IoRequest;
28use crate::file::ReadSource;
29use crate::file::ReadSourceRef;
30use crate::runtime::Handle;
31
32/// Read exactly `buffer.len()` bytes from `file` starting at `offset`.
33/// This is a platform-specific helper that uses the most efficient method available.
34#[cfg(not(target_arch = "wasm32"))]
35pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> std::io::Result<()> {
36    #[cfg(unix)]
37    {
38        file.read_exact_at(buffer, offset)
39    }
40    #[cfg(not(unix))]
41    {
42        use std::io::SeekFrom;
43        let mut file_ref = file;
44        file_ref.seek(SeekFrom::Start(offset))?;
45        file_ref.read_exact(buffer)
46    }
47}
48
49const COALESCING_WINDOW: CoalesceWindow = CoalesceWindow {
50    // TODO(ngates): these numbers don't make sense if we're using spawn_blocking..
51    distance: 8 * 1024, // KB
52    max_size: 8 * 1024, // KB
53};
54const CONCURRENCY: usize = 32;
55
56impl IntoReadSource for PathBuf {
57    fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
58        self.as_path().into_read_source(handle)
59    }
60}
61
62impl IntoReadSource for &Path {
63    fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
64        let uri = self.to_string_lossy().to_string().into();
65        let file = Arc::new(File::open(self)?);
66        Ok(Arc::new(FileIoSource { uri, file, handle }))
67    }
68}
69
70impl IntoReadSource for &str {
71    fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
72        Path::new(self).into_read_source(handle)
73    }
74}
75
76pub(crate) struct FileIoSource {
77    uri: Arc<str>,
78    file: Arc<File>,
79    handle: Handle,
80}
81
82impl ReadSource for FileIoSource {
83    fn uri(&self) -> &Arc<str> {
84        &self.uri
85    }
86
87    fn coalesce_window(&self) -> Option<CoalesceWindow> {
88        Some(COALESCING_WINDOW)
89    }
90
91    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
92        let file = self.file.clone();
93        async move {
94            let metadata = file.metadata().map_err(VortexError::from)?;
95            Ok(metadata.len())
96        }
97        .boxed()
98    }
99
100    fn drive_send(
101        self: Arc<Self>,
102        requests: BoxStream<'static, IoRequest>,
103    ) -> BoxFuture<'static, ()> {
104        requests
105            // Amortize the cost of spawn_blocking by batching available requests.
106            // Too much batching, and we reduce concurrency.
107            .ready_chunks(1)
108            .map(move |reqs| {
109                let file = self.file.clone();
110                self.handle.spawn_blocking(move || {
111                    for req in reqs {
112                        let len = req.len();
113                        let offset = req.offset();
114                        let mut buffer = ByteBufferMut::with_capacity_aligned(len, req.alignment());
115                        unsafe { buffer.set_len(len) };
116
117                        let buffer_res = read_exact_at(&file, &mut buffer, offset);
118
119                        req.resolve(
120                            buffer_res
121                                .map(|_| buffer.freeze())
122                                .map_err(VortexError::from),
123                        )
124                    }
125                })
126            })
127            .buffer_unordered(CONCURRENCY)
128            .collect::<()>()
129            .boxed()
130    }
131}