use std::fs::File;
use std::io;
#[cfg(all(not(unix), not(windows)))]
use std::io::Read;
#[cfg(all(not(unix), not(windows)))]
use std::io::Seek;
#[cfg(unix)]
use std::os::unix::fs::FileExt;
#[cfg(windows)]
use std::os::windows::fs::FileExt;
use std::path::Path;
use std::sync::Arc;
use futures::FutureExt;
use futures::future::BoxFuture;
use vortex_array::buffer::BufferHandle;
use vortex_array::memory::DefaultHostAllocator;
use vortex_array::memory::HostAllocatorRef;
use vortex_buffer::Alignment;
use vortex_error::VortexResult;
use crate::CoalesceConfig;
use crate::VortexReadAt;
use crate::runtime::Handle;
#[cfg(not(target_arch = "wasm32"))]
pub fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> io::Result<()> {
#[cfg(unix)]
{
file.read_exact_at(buffer, offset)
}
#[cfg(windows)]
{
let mut bytes_read = 0;
while bytes_read < buffer.len() {
let read = file.seek_read(&mut buffer[bytes_read..], offset + bytes_read as u64)?;
if read == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
));
}
bytes_read += read;
}
Ok(())
}
#[cfg(all(not(unix), not(windows)))]
{
use std::io::SeekFrom;
let mut file_ref = file;
file_ref.seek(SeekFrom::Start(offset))?;
file_ref.read_exact(buffer)
}
}
pub const DEFAULT_CONCURRENCY: usize = 32;
pub struct FileReadAt {
uri: Arc<str>,
file: Arc<File>,
handle: Handle,
allocator: HostAllocatorRef,
}
impl FileReadAt {
pub fn open(path: impl AsRef<Path>, handle: Handle) -> VortexResult<Self> {
Self::open_with_allocator(path, handle, Arc::new(DefaultHostAllocator))
}
pub fn open_with_allocator(
path: impl AsRef<Path>,
handle: Handle,
allocator: HostAllocatorRef,
) -> VortexResult<Self> {
let path = path.as_ref();
let uri = path.to_string_lossy().to_string().into();
let file = Arc::new(File::open(path)?);
Ok(Self {
uri,
file,
handle,
allocator,
})
}
}
impl VortexReadAt for FileReadAt {
fn uri(&self) -> Option<&Arc<str>> {
Some(&self.uri)
}
fn coalesce_config(&self) -> Option<CoalesceConfig> {
Some(CoalesceConfig::file())
}
fn concurrency(&self) -> usize {
DEFAULT_CONCURRENCY
}
fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
let file = Arc::clone(&self.file);
async move {
let metadata = file.metadata()?;
Ok(metadata.len())
}
.boxed()
}
fn read_at(
&self,
offset: u64,
length: usize,
alignment: Alignment,
) -> BoxFuture<'static, VortexResult<BufferHandle>> {
let file = Arc::clone(&self.file);
let handle = self.handle.clone();
let allocator = Arc::clone(&self.allocator);
async move {
handle
.spawn_blocking(move || {
let mut buffer = allocator.allocate(length, alignment)?;
read_exact_at(&file, buffer.as_mut_slice(), offset)?;
Ok(BufferHandle::new_host(buffer.freeze()))
})
.await
}
.boxed()
}
}