use super::DataReaderTrait;
use crate::{Blob, ByteRange};
use anyhow::{Result, ensure};
use async_trait::async_trait;
use std::{fs::File, io::Read, path::Path};
use versatiles_derive::context;
#[derive(Debug)]
pub struct DataReaderFile {
name: String,
file: File,
size: u64,
}
impl DataReaderFile {
#[context("while opening file {path:?}")]
pub fn open(path: &Path) -> Result<Box<DataReaderFile>> {
ensure!(path.exists(), "file {path:?} does not exist");
ensure!(path.is_absolute(), "path {path:?} must be absolute");
ensure!(path.is_file(), "path {path:?} must be a file");
let path = path.canonicalize()?;
let file = File::open(&path)?;
let size = file.metadata()?.len();
Ok(Box::new(DataReaderFile {
name: path.to_string_lossy().into_owned(),
file,
size,
}))
}
}
#[async_trait]
impl DataReaderTrait for DataReaderFile {
#[context("while reading range {range:?} from file '{}'", self.name)]
async fn read_range(&self, range: &ByteRange) -> Result<Blob> {
let mut buffer = vec![0; usize::try_from(range.length).context("Range length too large for this platform")?];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
self.file.read_exact_at(&mut buffer, range.offset).with_context(|| {
format!(
"failed to read {} bytes at offset {} in file '{}'",
range.length, range.offset, self.name
)
})?;
}
#[cfg(windows)]
{
use std::os::windows::fs::FileExt;
self.file.seek_read(&mut buffer, range.offset).with_context(|| {
format!(
"failed to read {} bytes at offset {} in file '{}'",
range.length, range.offset, self.name
)
})?;
}
#[cfg(not(any(unix, windows)))]
{
compile_error!("Platform not supported: need position-independent file I/O");
}
Ok(Blob::from(buffer))
}
#[context("while reading all bytes from file '{}'", self.name)]
async fn read_all(&self) -> Result<Blob> {
let mut buffer = vec![0; usize::try_from(self.size).context("File size too large for this platform")?];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
self
.file
.read_exact_at(&mut buffer, 0)
.with_context(|| format!("failed to read all {} bytes from file '{}'", self.size, self.name))?;
}
#[cfg(windows)]
{
use std::os::windows::fs::FileExt;
self
.file
.seek_read(&mut buffer, 0)
.with_context(|| format!("failed to read all {} bytes from file '{}'", self.size, self.name))?;
}
Ok(Blob::from(buffer))
}
fn get_name(&self) -> &str {
&self.name
}
}
impl Read for DataReaderFile {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.file.read(buf)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::assert_wildcard;
use assert_fs::NamedTempFile;
use std::io::Write;
#[tokio::test]
async fn new() -> Result<()> {
let temp_file_path = NamedTempFile::new("testfile.txt")?;
let invalid_path = NamedTempFile::new("nonexistent.txt")?;
{
let mut temp_file = File::create(&temp_file_path)?;
temp_file.write_all(b"Hello, world!")?;
}
let data_reader_file = DataReaderFile::open(temp_file_path.path());
assert!(data_reader_file.is_ok());
let data_reader_file = DataReaderFile::open(invalid_path.path());
assert!(data_reader_file.is_err());
Ok(())
}
#[tokio::test]
async fn read_range() -> Result<()> {
let temp_file_path = NamedTempFile::new("testfile.txt")?;
{
let mut temp_file = File::create(&temp_file_path)?;
temp_file.write_all(b"Hello, world!")?;
}
let data_reader_file = DataReaderFile::open(temp_file_path.path())?;
let range = ByteRange { offset: 4, length: 6 };
let blob = data_reader_file.read_range(&range).await?;
assert_eq!(blob.as_str(), "o, wor");
Ok(())
}
#[tokio::test]
async fn get_name() -> Result<()> {
let temp_file_path = NamedTempFile::new("testfile.txt")?;
{
let mut temp_file = File::create(&temp_file_path)?;
temp_file.write_all(b"Hello, world!")?;
}
let data_reader_file = DataReaderFile::open(temp_file_path.path())?;
assert_wildcard!(data_reader_file.get_name(), "*testfile.txt");
Ok(())
}
#[test]
fn read_sync_and_read_trait() -> Result<()> {
let temp_file = NamedTempFile::new("testfile_sync.txt")?;
{
let mut f = File::create(temp_file.path())?;
f.write_all(b"Sync read test")?;
}
let mut reader = DataReaderFile::open(temp_file.path()).unwrap();
let mut buf = Vec::new();
reader.read_to_end(&mut buf)?;
assert_eq!(buf, b"Sync read test");
Ok(())
}
#[tokio::test]
async fn test_read_all_method() -> Result<()> {
let temp_file = NamedTempFile::new("testfile_all.txt")?;
{
let mut f = File::create(temp_file.path())?;
f.write_all(b"Async read all test")?;
}
let reader = DataReaderFile::open(temp_file.path())?;
let blob = reader.read_all().await?;
assert_eq!(blob.as_slice(), b"Async read all test");
Ok(())
}
#[tokio::test]
async fn concurrent_range_reads_return_correct_data() -> Result<()> {
use std::sync::Arc;
use tokio::task::JoinSet;
let temp_file = NamedTempFile::new("concurrent_test.dat")?;
let mut file = File::create(&temp_file)?;
let mut data = Vec::new();
for i in 0..10240 {
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
data.push((i & 0xFF) as u8);
}
file.write_all(&data)?;
drop(file);
let reader = Arc::new(DataReaderFile::open(temp_file.path())?);
let mut join_set = JoinSet::new();
for task_id in 0..100 {
let reader_clone = Arc::clone(&reader);
join_set.spawn(async move {
let offset = (task_id * 100) % 10000;
let length = 100;
let range = ByteRange::new(offset, length);
let blob = reader_clone.read_range(&range).await?;
for (i, &byte) in blob.as_slice().iter().enumerate() {
#[allow(clippy::cast_possible_truncation)] let expected = ((offset + i as u64) & 0xFF) as u8;
assert_eq!(
byte,
expected,
"Task {}: Byte {} at offset {} is {} but expected {}",
task_id,
i,
offset + i as u64,
byte,
expected
);
}
Ok::<_, anyhow::Error>(())
});
}
while let Some(result) = join_set.join_next().await {
result??;
}
Ok(())
}
}