vortex_io/file/
std_file.rs1use std::fs::File;
5use std::os::unix::fs::FileExt;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8
9use futures::future::BoxFuture;
10use futures::stream::BoxStream;
11use futures::{FutureExt, StreamExt};
12use vortex_buffer::ByteBufferMut;
13use vortex_error::{VortexError, VortexResult};
14
15use crate::file::{CoalesceWindow, IntoReadSource, IoRequest, ReadSource, ReadSourceRef};
16use crate::runtime::Handle;
17
18const COALESCING_WINDOW: CoalesceWindow = CoalesceWindow {
19 distance: 8 * 1024, max_size: 8 * 1024, };
23const CONCURRENCY: usize = 32;
24
25impl IntoReadSource for PathBuf {
26 fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
27 self.as_path().into_read_source(handle)
28 }
29}
30
31impl IntoReadSource for &Path {
32 fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
33 let uri = self.to_string_lossy().to_string().into();
34 let file = Arc::new(File::open(self)?);
35 Ok(Arc::new(FileIoSource { uri, file, handle }))
36 }
37}
38
39impl IntoReadSource for &str {
40 fn into_read_source(self, handle: Handle) -> VortexResult<ReadSourceRef> {
41 Path::new(self).into_read_source(handle)
42 }
43}
44
45pub(crate) struct FileIoSource {
46 uri: Arc<str>,
47 file: Arc<File>,
48 handle: Handle,
49}
50
51impl ReadSource for FileIoSource {
52 fn uri(&self) -> &Arc<str> {
53 &self.uri
54 }
55
56 fn coalesce_window(&self) -> Option<CoalesceWindow> {
57 Some(COALESCING_WINDOW)
58 }
59
60 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
61 let file = self.file.clone();
62 async move {
63 let metadata = file.metadata().map_err(VortexError::from)?;
64 Ok(metadata.len())
65 }
66 .boxed()
67 }
68
69 fn drive_send(
70 self: Arc<Self>,
71 requests: BoxStream<'static, IoRequest>,
72 ) -> BoxFuture<'static, ()> {
73 requests
74 .ready_chunks(1)
77 .map(move |reqs| {
78 let file = self.file.clone();
79 self.handle.spawn_blocking(move || {
80 for req in reqs {
81 let len = req.len();
82 let offset = req.offset();
83 let mut buffer = ByteBufferMut::with_capacity_aligned(len, req.alignment());
84 unsafe { buffer.set_len(len) };
85 req.resolve(match file.read_exact_at(&mut buffer, offset) {
86 Ok(()) => Ok(buffer.freeze()),
87 Err(e) => Err(VortexError::from(e)),
88 })
89 }
90 })
91 })
92 .buffer_unordered(CONCURRENCY)
93 .collect::<()>()
94 .boxed()
95 }
96}