vortex_io/std_file/
read_at.rs1use std::fs::File;
5use std::io;
6#[cfg(all(not(unix), not(windows)))]
7use std::io::Read;
8#[cfg(all(not(unix), not(windows)))]
9use std::io::Seek;
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12#[cfg(windows)]
13use std::os::windows::fs::FileExt;
14use std::path::Path;
15use std::sync::Arc;
16
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use vortex_array::buffer::BufferHandle;
20use vortex_buffer::Alignment;
21use vortex_buffer::ByteBufferMut;
22use vortex_error::VortexResult;
23
24use crate::CoalesceConfig;
25use crate::VortexReadAt;
26use crate::runtime::Handle;
27
28#[cfg(not(target_arch = "wasm32"))]
31pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> io::Result<()> {
32 #[cfg(unix)]
33 {
34 file.read_exact_at(buffer, offset)
35 }
36 #[cfg(windows)]
37 {
38 let mut bytes_read = 0;
39 while bytes_read < buffer.len() {
40 let read = file.seek_read(&mut buffer[bytes_read..], offset + bytes_read as u64)?;
41 if read == 0 {
42 return Err(io::Error::new(
43 io::ErrorKind::UnexpectedEof,
44 "failed to fill whole buffer",
45 ));
46 }
47 bytes_read += read;
48 }
49 Ok(())
50 }
51 #[cfg(all(not(unix), not(windows)))]
52 {
53 use std::io::SeekFrom;
54 let mut file_ref = file;
55 file_ref.seek(SeekFrom::Start(offset))?;
56 file_ref.read_exact(buffer)
57 }
58}
59
60const COALESCING_CONFIG: CoalesceConfig = CoalesceConfig {
61 distance: 8 * 1024, max_size: 8 * 1024, };
65pub const DEFAULT_CONCURRENCY: usize = 32;
67
68pub struct FileReadAt {
70 uri: Arc<str>,
71 file: Arc<File>,
72 handle: Handle,
73}
74
75impl FileReadAt {
76 pub fn open(path: impl AsRef<Path>, handle: Handle) -> VortexResult<Self> {
78 let path = path.as_ref();
79 let uri = path.to_string_lossy().to_string().into();
80 let file = Arc::new(File::open(path)?);
81 Ok(Self { uri, file, handle })
82 }
83}
84
85impl VortexReadAt for FileReadAt {
86 fn uri(&self) -> Option<&Arc<str>> {
87 Some(&self.uri)
88 }
89
90 fn coalesce_config(&self) -> Option<CoalesceConfig> {
91 Some(COALESCING_CONFIG)
92 }
93
94 fn concurrency(&self) -> usize {
95 DEFAULT_CONCURRENCY
96 }
97
98 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
99 let file = self.file.clone();
100 async move {
101 let metadata = file.metadata()?;
102 Ok(metadata.len())
103 }
104 .boxed()
105 }
106
107 fn read_at(
108 &self,
109 offset: u64,
110 length: usize,
111 alignment: Alignment,
112 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
113 let file = self.file.clone();
114 let handle = self.handle.clone();
115 async move {
116 handle
117 .spawn_blocking(move || {
118 let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment);
119 unsafe { buffer.set_len(length) };
120 read_exact_at(&file, &mut buffer, offset)?;
121 Ok(BufferHandle::new_host(buffer.freeze()))
122 })
123 .await
124 }
125 .boxed()
126 }
127}