vortex_io/file/
std_file.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fs::File;
5use std::os::unix::fs::FileExt;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8
9use futures::future::BoxFuture;
10use futures::stream::BoxStream;
11use futures::{FutureExt, StreamExt};
12use vortex_buffer::ByteBufferMut;
13use vortex_error::{VortexError, VortexResult};
14
15use crate::file::{CoalesceWindow, IntoReadSource, IoRequest, ReadSource, ReadSourceRef};
16use crate::runtime::Handle;
17
18const COALESCING_WINDOW: CoalesceWindow = CoalesceWindow {
19    // TODO(ngates): these numbers don't make sense if we're using spawn_blocking..
20    distance: 8 * 1024, // KB
21    max_size: 8 * 1024, // KB
22};
23const CONCURRENCY: usize = 32;
24
25impl IntoReadSource for PathBuf {
26    fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
27        self.as_path().into_read_source(handle)
28    }
29}
30
31impl IntoReadSource for &Path {
32    fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
33        let uri = self.to_string_lossy().to_string().into();
34        let file = Arc::new(File::open(self)?);
35        Ok(Arc::new(FileIoSource { uri, file, handle }))
36    }
37}
38
39impl IntoReadSource for &str {
40    fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
41        Path::new(self).into_read_source(handle)
42    }
43}
44
45pub(crate) struct FileIoSource {
46    uri: Arc<str>,
47    file: Arc<File>,
48    handle: Handle,
49}
50
51impl ReadSource for FileIoSource {
52    fn uri(&self) -> &Arc<str> {
53        &self.uri
54    }
55
56    fn coalesce_window(&self) -> Option<CoalesceWindow> {
57        Some(COALESCING_WINDOW)
58    }
59
60    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
61        let file = self.file.clone();
62        async move {
63            let metadata = file.metadata().map_err(VortexError::from)?;
64            Ok(metadata.len())
65        }
66        .boxed()
67    }
68
69    fn drive_send(
70        self: Arc<Self>,
71        requests: BoxStream<'static, IoRequest>,
72    ) -> BoxFuture<'static, ()> {
73        requests
74            // Amortize the cost of spawn_blocking by batching available requests.
75            // Too much batching, and we reduce concurrency.
76            .ready_chunks(1)
77            .map(move |reqs| {
78                let file = self.file.clone();
79                self.handle.spawn_blocking(move || {
80                    for req in reqs {
81                        let len = req.len();
82                        let offset = req.offset();
83                        let mut buffer = ByteBufferMut::with_capacity_aligned(len, req.alignment());
84                        unsafe { buffer.set_len(len) };
85                        req.resolve(match file.read_exact_at(&mut buffer, offset) {
86                            Ok(()) => Ok(buffer.freeze()),
87                            Err(e) => Err(VortexError::from(e)),
88                        })
89                    }
90                })
91            })
92            .buffer_unordered(CONCURRENCY)
93            .collect::<()>()
94            .boxed()
95    }
96}