vortex_io/std_file/
read_at.rs1use std::fs::File;
5use std::io;
6#[cfg(all(not(unix), not(windows)))]
7use std::io::Read;
8#[cfg(all(not(unix), not(windows)))]
9use std::io::Seek;
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12#[cfg(windows)]
13use std::os::windows::fs::FileExt;
14use std::path::Path;
15use std::sync::Arc;
16
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use vortex_array::buffer::BufferHandle;
20use vortex_array::memory::DefaultHostAllocator;
21use vortex_array::memory::HostAllocatorRef;
22use vortex_buffer::Alignment;
23use vortex_error::VortexResult;
24
25use crate::CoalesceConfig;
26use crate::VortexReadAt;
27use crate::runtime::Handle;
28
29#[cfg(not(target_arch = "wasm32"))]
32pub fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> io::Result<()> {
33 #[cfg(unix)]
34 {
35 file.read_exact_at(buffer, offset)
36 }
37 #[cfg(windows)]
38 {
39 let mut bytes_read = 0;
40 while bytes_read < buffer.len() {
41 let read = file.seek_read(&mut buffer[bytes_read..], offset + bytes_read as u64)?;
42 if read == 0 {
43 return Err(io::Error::new(
44 io::ErrorKind::UnexpectedEof,
45 "failed to fill whole buffer",
46 ));
47 }
48 bytes_read += read;
49 }
50 Ok(())
51 }
52 #[cfg(all(not(unix), not(windows)))]
53 {
54 use std::io::SeekFrom;
55 let mut file_ref = file;
56 file_ref.seek(SeekFrom::Start(offset))?;
57 file_ref.read_exact(buffer)
58 }
59}
60
61pub const DEFAULT_CONCURRENCY: usize = 32;
63
64pub struct FileReadAt {
66 uri: Arc<str>,
67 file: Arc<File>,
68 handle: Handle,
69 allocator: HostAllocatorRef,
70}
71
72impl FileReadAt {
73 pub fn open(path: impl AsRef<Path>, handle: Handle) -> VortexResult<Self> {
75 Self::open_with_allocator(path, handle, Arc::new(DefaultHostAllocator))
76 }
77
78 pub fn open_with_allocator(
80 path: impl AsRef<Path>,
81 handle: Handle,
82 allocator: HostAllocatorRef,
83 ) -> VortexResult<Self> {
84 let path = path.as_ref();
85 let uri = path.to_string_lossy().to_string().into();
86 let file = Arc::new(File::open(path)?);
87 Ok(Self {
88 uri,
89 file,
90 handle,
91 allocator,
92 })
93 }
94}
95
96impl VortexReadAt for FileReadAt {
97 fn uri(&self) -> Option<&Arc<str>> {
98 Some(&self.uri)
99 }
100
101 fn coalesce_config(&self) -> Option<CoalesceConfig> {
102 Some(CoalesceConfig::file())
103 }
104
105 fn concurrency(&self) -> usize {
106 DEFAULT_CONCURRENCY
107 }
108
109 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
110 let file = Arc::clone(&self.file);
111 async move {
112 let metadata = file.metadata()?;
113 Ok(metadata.len())
114 }
115 .boxed()
116 }
117
118 fn read_at(
119 &self,
120 offset: u64,
121 length: usize,
122 alignment: Alignment,
123 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
124 let file = Arc::clone(&self.file);
125 let handle = self.handle.clone();
126 let allocator = Arc::clone(&self.allocator);
127 async move {
128 handle
129 .spawn_blocking(move || {
130 let mut buffer = allocator.allocate(length, alignment)?;
131 read_exact_at(&file, buffer.as_mut_slice(), offset)?;
132 Ok(BufferHandle::new_host(buffer.freeze()))
133 })
134 .await
135 }
136 .boxed()
137 }
138}