vortex_io/std_file/
read_at.rs1use std::fs::File;
5use std::io;
6#[cfg(all(not(unix), not(windows)))]
7use std::io::Read;
8#[cfg(all(not(unix), not(windows)))]
9use std::io::Seek;
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12#[cfg(windows)]
13use std::os::windows::fs::FileExt;
14use std::path::Path;
15use std::sync::Arc;
16
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use vortex_array::buffer::BufferHandle;
20use vortex_buffer::Alignment;
21use vortex_buffer::ByteBufferMut;
22use vortex_error::VortexResult;
23
24use crate::CoalesceConfig;
25use crate::VortexReadAt;
26use crate::runtime::Handle;
27
28#[cfg(not(target_arch = "wasm32"))]
31pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> io::Result<()> {
32 #[cfg(unix)]
33 {
34 file.read_exact_at(buffer, offset)
35 }
36 #[cfg(windows)]
37 {
38 let mut bytes_read = 0;
39 while bytes_read < buffer.len() {
40 let read = file.seek_read(&mut buffer[bytes_read..], offset + bytes_read as u64)?;
41 if read == 0 {
42 return Err(io::Error::new(
43 io::ErrorKind::UnexpectedEof,
44 "failed to fill whole buffer",
45 ));
46 }
47 bytes_read += read;
48 }
49 Ok(())
50 }
51 #[cfg(all(not(unix), not(windows)))]
52 {
53 use std::io::SeekFrom;
54 let mut file_ref = file;
55 file_ref.seek(SeekFrom::Start(offset))?;
56 file_ref.read_exact(buffer)
57 }
58}
59
60const COALESCING_CONFIG: CoalesceConfig = CoalesceConfig {
61 distance: 1024 * 1024, max_size: 4 * 1024 * 1024, };
64pub const DEFAULT_CONCURRENCY: usize = 32;
66
67pub struct FileReadAt {
69 uri: Arc<str>,
70 file: Arc<File>,
71 handle: Handle,
72}
73
74impl FileReadAt {
75 pub fn open(path: impl AsRef<Path>, handle: Handle) -> VortexResult<Self> {
77 let path = path.as_ref();
78 let uri = path.to_string_lossy().to_string().into();
79 let file = Arc::new(File::open(path)?);
80 Ok(Self { uri, file, handle })
81 }
82}
83
84impl VortexReadAt for FileReadAt {
85 fn uri(&self) -> Option<&Arc<str>> {
86 Some(&self.uri)
87 }
88
89 fn coalesce_config(&self) -> Option<CoalesceConfig> {
90 Some(COALESCING_CONFIG)
91 }
92
93 fn concurrency(&self) -> usize {
94 DEFAULT_CONCURRENCY
95 }
96
97 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
98 let file = self.file.clone();
99 async move {
100 let metadata = file.metadata()?;
101 Ok(metadata.len())
102 }
103 .boxed()
104 }
105
106 fn read_at(
107 &self,
108 offset: u64,
109 length: usize,
110 alignment: Alignment,
111 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
112 let file = self.file.clone();
113 let handle = self.handle.clone();
114 async move {
115 handle
116 .spawn_blocking(move || {
117 let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment);
118 unsafe { buffer.set_len(length) };
119 read_exact_at(&file, &mut buffer, offset)?;
120 Ok(BufferHandle::new_host(buffer.freeze()))
121 })
122 .await
123 }
124 .boxed()
125 }
126}