vortex_io/file/
std_file.rs1use std::fs::File;
5#[cfg(not(unix))]
6use std::io::Read;
7#[cfg(not(unix))]
8use std::io::Seek;
9#[cfg(unix)]
10use std::os::unix::fs::FileExt;
11#[cfg(windows)]
12use std::os::windows::fs::FileExt;
13use std::path::Path;
14use std::path::PathBuf;
15use std::sync::Arc;
16
17use futures::FutureExt;
18use futures::StreamExt;
19use futures::future::BoxFuture;
20use futures::stream::BoxStream;
21use vortex_buffer::ByteBufferMut;
22use vortex_error::VortexError;
23use vortex_error::VortexResult;
24
25use crate::file::CoalesceWindow;
26use crate::file::IntoReadSource;
27use crate::file::IoRequest;
28use crate::file::ReadSource;
29use crate::file::ReadSourceRef;
30use crate::runtime::Handle;
31
32#[cfg(not(target_arch = "wasm32"))]
35pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> std::io::Result<()> {
36 #[cfg(unix)]
37 {
38 file.read_exact_at(buffer, offset)
39 }
40 #[cfg(not(unix))]
41 {
42 use std::io::SeekFrom;
43 let mut file_ref = file;
44 file_ref.seek(SeekFrom::Start(offset))?;
45 file_ref.read_exact(buffer)
46 }
47}
48
49const COALESCING_WINDOW: CoalesceWindow = CoalesceWindow {
50 distance: 8 * 1024, max_size: 8 * 1024, };
54const CONCURRENCY: usize = 32;
55
56impl IntoReadSource for PathBuf {
57 fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
58 self.as_path().into_read_source(handle)
59 }
60}
61
62impl IntoReadSource for &Path {
63 fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
64 let uri = self.to_string_lossy().to_string().into();
65 let file = Arc::new(File::open(self)?);
66 Ok(Arc::new(FileIoSource { uri, file, handle }))
67 }
68}
69
70impl IntoReadSource for &str {
71 fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
72 Path::new(self).into_read_source(handle)
73 }
74}
75
76pub(crate) struct FileIoSource {
77 uri: Arc<str>,
78 file: Arc<File>,
79 handle: Handle,
80}
81
82impl ReadSource for FileIoSource {
83 fn uri(&self) -> &Arc<str> {
84 &self.uri
85 }
86
87 fn coalesce_window(&self) -> Option<CoalesceWindow> {
88 Some(COALESCING_WINDOW)
89 }
90
91 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
92 let file = self.file.clone();
93 async move {
94 let metadata = file.metadata().map_err(VortexError::from)?;
95 Ok(metadata.len())
96 }
97 .boxed()
98 }
99
100 fn drive_send(
101 self: Arc<Self>,
102 requests: BoxStream<'static, IoRequest>,
103 ) -> BoxFuture<'static, ()> {
104 requests
105 .ready_chunks(1)
108 .map(move |reqs| {
109 let file = self.file.clone();
110 self.handle.spawn_blocking(move || {
111 for req in reqs {
112 let len = req.len();
113 let offset = req.offset();
114 let mut buffer = ByteBufferMut::with_capacity_aligned(len, req.alignment());
115 unsafe { buffer.set_len(len) };
116
117 let buffer_res = read_exact_at(&file, &mut buffer, offset);
118
119 req.resolve(
120 buffer_res
121 .map(|_| buffer.freeze())
122 .map_err(VortexError::from),
123 )
124 }
125 })
126 })
127 .buffer_unordered(CONCURRENCY)
128 .collect::<()>()
129 .boxed()
130 }
131}