vortex_io/std_file/
read_at.rs1use std::fs::File;
5use std::io;
6#[cfg(all(not(unix), not(windows)))]
7use std::io::Read;
8#[cfg(all(not(unix), not(windows)))]
9use std::io::Seek;
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12#[cfg(windows)]
13use std::os::windows::fs::FileExt;
14use std::path::Path;
15use std::sync::Arc;
16
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use vortex_array::buffer::BufferHandle;
20use vortex_buffer::Alignment;
21use vortex_buffer::ByteBufferMut;
22use vortex_error::VortexResult;
23
24use crate::CoalesceConfig;
25use crate::VortexReadAt;
26use crate::runtime::Handle;
27
28#[cfg(not(target_arch = "wasm32"))]
31pub fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> io::Result<()> {
32 #[cfg(unix)]
33 {
34 file.read_exact_at(buffer, offset)
35 }
36 #[cfg(windows)]
37 {
38 let mut bytes_read = 0;
39 while bytes_read < buffer.len() {
40 let read = file.seek_read(&mut buffer[bytes_read..], offset + bytes_read as u64)?;
41 if read == 0 {
42 return Err(io::Error::new(
43 io::ErrorKind::UnexpectedEof,
44 "failed to fill whole buffer",
45 ));
46 }
47 bytes_read += read;
48 }
49 Ok(())
50 }
51 #[cfg(all(not(unix), not(windows)))]
52 {
53 use std::io::SeekFrom;
54 let mut file_ref = file;
55 file_ref.seek(SeekFrom::Start(offset))?;
56 file_ref.read_exact(buffer)
57 }
58}
59
60pub const DEFAULT_CONCURRENCY: usize = 32;
62
63pub struct FileReadAt {
65 uri: Arc<str>,
66 file: Arc<File>,
67 handle: Handle,
68}
69
70impl FileReadAt {
71 pub fn open(path: impl AsRef<Path>, handle: Handle) -> VortexResult<Self> {
73 let path = path.as_ref();
74 let uri = path.to_string_lossy().to_string().into();
75 let file = Arc::new(File::open(path)?);
76 Ok(Self { uri, file, handle })
77 }
78}
79
80impl VortexReadAt for FileReadAt {
81 fn uri(&self) -> Option<&Arc<str>> {
82 Some(&self.uri)
83 }
84
85 fn coalesce_config(&self) -> Option<CoalesceConfig> {
86 Some(CoalesceConfig::file())
87 }
88
89 fn concurrency(&self) -> usize {
90 DEFAULT_CONCURRENCY
91 }
92
93 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
94 let file = self.file.clone();
95 async move {
96 let metadata = file.metadata()?;
97 Ok(metadata.len())
98 }
99 .boxed()
100 }
101
102 fn read_at(
103 &self,
104 offset: u64,
105 length: usize,
106 alignment: Alignment,
107 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
108 let file = self.file.clone();
109 let handle = self.handle.clone();
110 async move {
111 handle
112 .spawn_blocking(move || {
113 let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment);
114 unsafe { buffer.set_len(length) };
115 read_exact_at(&file, &mut buffer, offset)?;
116 Ok(BufferHandle::new_host(buffer.freeze()))
117 })
118 .await
119 }
120 .boxed()
121 }
122}