use anyhow as ah;
use crate::drop_caches::drop_file_caches;
use crate::stream_aggregator::{DtStreamAgg, DtStreamAggChunk};
use crate::util::prettybytes;
use hhmmss::Hhmmss;
use std::cmp::min;
use std::fs::{File, OpenOptions};
use std::io::{Read, Write, Seek, SeekFrom};
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Instant;
#[cfg(not(target_os="windows"))]
use libc::ENOSPC;
#[cfg(target_os="windows")]
use winapi::shared::winerror::ERROR_DISK_FULL as ENOSPC;
pub use crate::stream_aggregator::DtStreamType;
const LOG_BYTE_THRES: u64 = 1024 * 1024;
const LOG_SEC_THRES: u64 = 10;
pub struct DisktestFile {
file: Option<File>,
path: PathBuf,
seek_offset: u64,
write_count: u64,
quiet_level: u8,
}
impl DisktestFile {
pub fn open(path: &str,
read: bool,
write: bool,
quiet_level: u8) -> ah::Result<DisktestFile> {
let path = Path::new(path);
let file = match OpenOptions::new().read(read)
.write(write)
.create(write)
.open(path) {
Ok(f) => f,
Err(e) => {
return Err(ah::format_err!("Failed to open file {:?}: {}", path, e));
},
};
Ok(DisktestFile {
file: Some(file),
path: path.to_path_buf(),
seek_offset: 0,
write_count: 0,
quiet_level,
})
}
fn seek(&mut self, offset: u64) -> io::Result<u64> {
if let Some(f) = self.file.as_mut() {
match f.seek(SeekFrom::Start(offset)) {
Ok(x) => {
self.seek_offset = offset;
Ok(x)
},
Err(e) => Err(e),
}
} else {
Err(io::Error::new(io::ErrorKind::Other, "File already closed."))
}
}
fn sync(&mut self) -> io::Result<()> {
if let Some(f) = self.file.as_mut() {
f.sync_all()
} else {
Err(io::Error::new(io::ErrorKind::Other, "File already closed."))
}
}
fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
if let Some(f) = self.file.as_mut() {
f.read(buffer)
} else {
Err(io::Error::new(io::ErrorKind::Other, "File already closed."))
}
}
fn write(&mut self, buffer: &[u8]) -> io::Result<()> {
if let Some(f) = self.file.as_mut() {
match f.write_all(buffer) {
Ok(()) => {
self.write_count += buffer.len() as u64;
Ok(())
},
Err(e) => Err(e),
}
} else {
Err(io::Error::new(io::ErrorKind::Other, "File already closed."))
}
}
fn close(&mut self) {
if let Some(file) = self.file.take() {
if self.write_count > 0 {
if let Err(e) = drop_file_caches(file,
self.path.as_path(),
self.seek_offset,
self.write_count) {
eprintln!("WARNING: Failed to drop operating system caches: {}", e);
} else if self.quiet_level < 1 {
println!("Write done and successfully dropped file caches.");
}
self.write_count = 0;
}
}
}
fn get_path(&self) -> &PathBuf {
&self.path
}
fn get_quiet_level(&self) -> u8 {
self.quiet_level
}
}
impl Drop for DisktestFile {
fn drop(&mut self) {
self.close();
}
}
pub struct Disktest {
stream_agg: DtStreamAgg,
abort: Option<Arc<AtomicBool>>,
log_count: u64,
log_time: Instant,
begin_time: Instant,
}
impl Disktest {
pub const UNLIMITED: u64 = u64::MAX;
pub fn new(algorithm: DtStreamType,
seed: Vec<u8>,
invert_pattern: bool,
nr_threads: usize,
abort: Option<Arc<AtomicBool>>) -> Disktest {
let nr_threads = if nr_threads == 0 { num_cpus::get() } else { nr_threads };
Disktest {
stream_agg: DtStreamAgg::new(algorithm,
seed,
invert_pattern,
nr_threads),
abort,
log_count: 0,
log_time: Instant::now(),
begin_time: Instant::now(),
}
}
fn log_reset(&mut self) {
self.log_count = 0;
self.log_time = Instant::now();
self.begin_time = self.log_time;
}
fn log(&mut self,
quiet_level: u8,
prefix: &str,
inc_processed: usize,
abs_processed: u64,
no_limiting: bool,
suffix: &str) {
if quiet_level < 2 {
self.log_count += inc_processed as u64;
if (self.log_count >= LOG_BYTE_THRES && quiet_level == 0) || no_limiting {
let now = Instant::now();
let expired = now.duration_since(self.log_time).as_secs() >= LOG_SEC_THRES;
if (expired && quiet_level == 0) || no_limiting {
let dur_elapsed = now - self.begin_time;
let sec_elapsed = dur_elapsed.as_secs();
let rate = if sec_elapsed > 0 { abs_processed / sec_elapsed } else { 0 };
println!("{}{} @ {}/s ({}){}",
prefix,
prettybytes(abs_processed, true, true),
prettybytes(rate, true, false),
dur_elapsed.hhmmss(),
suffix);
self.log_time = now;
}
self.log_count = 0;
}
}
}
fn init(&mut self,
file: &mut DisktestFile,
prefix: &str,
seek: u64) -> ah::Result<()> {
self.log_reset();
if file.get_quiet_level() < 2 {
println!("{} {:?}, starting at position {}...",
prefix,
file.get_path(),
prettybytes(seek, true, true));
}
let seek = match self.stream_agg.activate(seek) {
Ok(s) => s,
Err(e) => return Err(e),
};
if let Err(e) = file.seek(seek) {
return Err(ah::format_err!("File seek to {} failed: {}",
seek, e.to_string()));
}
Ok(())
}
fn write_finalize(&mut self,
file: &mut DisktestFile,
bytes_written: u64) -> ah::Result<()> {
if file.get_quiet_level() < 2 {
println!("Writing stopped. Syncing...");
}
if let Err(e) = file.sync() {
return Err(ah::format_err!("Sync failed: {}", e));
}
self.log(file.get_quiet_level(),
"Done. Wrote ", 0, bytes_written, true, ".");
Ok(())
}
pub fn write(&mut self,
file: DisktestFile,
seek: u64,
max_bytes: u64) -> ah::Result<u64> {
let mut file = file;
let mut bytes_left = max_bytes;
let mut bytes_written = 0u64;
let chunk_size = self.stream_agg.get_chunk_size() as u64;
self.init(&mut file, "Writing", seek)?;
loop {
let chunk = self.stream_agg.wait_chunk()?;
let write_len = min(chunk_size, bytes_left) as usize;
if let Err(e) = file.write(&chunk.get_data()[0..write_len]) {
if let Some(err_code) = e.raw_os_error() {
if max_bytes == Disktest::UNLIMITED &&
err_code == ENOSPC as i32 {
self.write_finalize(&mut file, bytes_written)?;
break; }
}
self.write_finalize(&mut file, bytes_written)?;
return Err(ah::format_err!("Write error: {}", e));
}
bytes_written += write_len as u64;
bytes_left -= write_len as u64;
if bytes_left == 0 {
self.write_finalize(&mut file, bytes_written)?;
break;
}
self.log(file.get_quiet_level(),
"Wrote ", write_len, bytes_written, false, " ...");
if let Some(abort) = &self.abort {
if abort.load(Ordering::Relaxed) {
self.write_finalize(&mut file, bytes_written)?;
return Err(ah::format_err!("Aborted by signal!"));
}
}
}
Ok(bytes_written)
}
fn verify_finalize(&mut self,
file: &DisktestFile,
bytes_read: u64) {
self.log(file.get_quiet_level(),
"Done. Verified ", 0, bytes_read, true, ".");
}
fn verify_failed(&self,
read_count: usize,
bytes_read: u64,
buffer: &[u8],
chunk: &DtStreamAggChunk) -> ah::Error {
for (i, buffer_byte) in buffer.iter().enumerate().take(read_count) {
if *buffer_byte != chunk.get_data()[i] {
let pos = bytes_read + i as u64;
if pos >= 1024 {
return ah::format_err!("Data MISMATCH at byte {} = {}!",
pos, prettybytes(pos, true, true))
} else {
return ah::format_err!("Data MISMATCH at byte {}!", pos)
}
}
}
panic!("Internal error: verify_failed() no mismatch.");
}
pub fn verify(&mut self,
file: DisktestFile,
seek: u64,
max_bytes: u64) -> ah::Result<u64> {
let mut file = file;
let mut bytes_left = max_bytes;
let mut bytes_read = 0u64;
let readbuf_len = self.stream_agg.get_chunk_size();
let mut buffer = vec![0; readbuf_len];
let mut read_count = 0;
let mut read_len = min(readbuf_len as u64, bytes_left) as usize;
self.init(&mut file, "Verifying", seek)?;
loop {
match file.read(&mut buffer[read_count..read_count+(read_len-read_count)]) {
Ok(n) => {
read_count += n;
assert!(read_count <= read_len);
if read_count == read_len || (read_count > 0 && n == 0) {
let chunk = self.stream_agg.wait_chunk()?;
if buffer[..read_count] != chunk.get_data()[..read_count] {
return Err(self.verify_failed(read_count, bytes_read, &buffer, &chunk));
}
bytes_read += read_count as u64;
bytes_left -= read_count as u64;
if bytes_left == 0 {
self.verify_finalize(&file, bytes_read);
break;
}
self.log(file.get_quiet_level(),
"Verified ", read_count, bytes_read, false, " ...");
read_count = 0;
read_len = min(readbuf_len as u64, bytes_left) as usize;
}
if n == 0 {
self.verify_finalize(&file, bytes_read);
break;
}
},
Err(e) => {
return Err(ah::format_err!("Read error at {}: {}",
prettybytes(bytes_read, true, true), e));
},
};
if let Some(abort) = &self.abort {
if abort.load(Ordering::Relaxed) {
self.verify_finalize(&file, bytes_read);
return Err(ah::format_err!("Aborted by signal!"));
}
}
}
Ok(bytes_read)
}
}
#[cfg(test)]
mod tests {
use crate::generator::{GeneratorChaCha8, GeneratorChaCha12, GeneratorChaCha20, GeneratorCrc};
use std::path::Path;
use super::*;
use tempfile::NamedTempFile;
fn run_test(algorithm: DtStreamType, base_size: usize, chunk_factor: usize) {
let mut tfile = NamedTempFile::new().unwrap();
let pstr = String::from(tfile.path().to_str().unwrap());
let path = Path::new(&pstr);
let file = tfile.as_file_mut();
let mut loc_file = file.try_clone().unwrap();
let seed = vec![42, 43, 44, 45];
let nr_threads = 2;
let mut dt = Disktest::new(algorithm, seed, false, nr_threads, None);
let mk_file = || {
DisktestFile {
file: Some(file.try_clone().unwrap()),
path: path.to_path_buf(),
seek_offset: 0,
write_count: 0,
quiet_level: 0,
}
};
let nr_bytes = 1000;
assert_eq!(dt.write(mk_file(), 0, nr_bytes).unwrap(), nr_bytes);
assert_eq!(dt.verify(mk_file(), 0, u64::MAX).unwrap(), nr_bytes);
let nr_bytes = 1000;
loc_file.set_len(0).unwrap();
assert_eq!(dt.write(mk_file(), 0, nr_bytes).unwrap(), nr_bytes);
assert_eq!(dt.verify(mk_file(), 0, nr_bytes / 2).unwrap(), nr_bytes / 2);
loc_file.set_len(0).unwrap();
let nr_bytes = (base_size * chunk_factor * nr_threads * 2 + 100) as u64;
assert_eq!(dt.write(mk_file(), 0, nr_bytes).unwrap(), nr_bytes);
assert_eq!(dt.verify(mk_file(), 0, u64::MAX).unwrap(), nr_bytes);
let nr_bytes = 1000;
loc_file.set_len(100).unwrap();
loc_file.seek(SeekFrom::Start(10)).unwrap();
assert_eq!(dt.write(mk_file(), 0, nr_bytes).unwrap(), nr_bytes);
assert_eq!(dt.verify(mk_file(), 0, u64::MAX).unwrap(), nr_bytes);
let nr_bytes = 1000;
loc_file.set_len(0).unwrap();
assert_eq!(dt.write(mk_file(), 0, nr_bytes).unwrap(), nr_bytes);
loc_file.seek(SeekFrom::Start(10)).unwrap();
writeln!(loc_file, "X").unwrap();
match dt.verify(mk_file(), 0, nr_bytes) {
Ok(_) => panic!("Verify of modified data did not fail!"),
Err(e) => assert_eq!(e.to_string(), "Data MISMATCH at byte 10!"),
}
loc_file.set_len(0).unwrap();
let nr_bytes = (base_size * chunk_factor * nr_threads * 10) as u64;
assert_eq!(dt.write(mk_file(), 0, nr_bytes).unwrap(), nr_bytes);
for offset in (0..nr_bytes).step_by(base_size * chunk_factor / 2) {
let bytes_verified = dt.verify(mk_file(), offset, u64::MAX).unwrap();
assert!(bytes_verified > 0 && bytes_verified <= nr_bytes);
}
loc_file.set_len(0).unwrap();
let nr_bytes = (base_size * chunk_factor * nr_threads * 10) as u64;
assert_eq!(dt.write(mk_file(), 0, nr_bytes).unwrap(), nr_bytes);
let offset = (base_size * chunk_factor * nr_threads * 2) as u64;
assert_eq!(dt.write(mk_file(), offset, nr_bytes).unwrap(), nr_bytes);
assert_eq!(dt.verify(mk_file(), 0, u64::MAX).unwrap(), nr_bytes + offset);
}
#[test]
fn test_chacha8() {
run_test(DtStreamType::ChaCha8,
GeneratorChaCha8::BASE_SIZE,
GeneratorChaCha8::CHUNK_FACTOR);
}
#[test]
fn test_chacha12() {
run_test(DtStreamType::ChaCha12,
GeneratorChaCha12::BASE_SIZE,
GeneratorChaCha12::CHUNK_FACTOR);
}
#[test]
fn test_chacha20() {
run_test(DtStreamType::ChaCha20,
GeneratorChaCha20::BASE_SIZE,
GeneratorChaCha20::CHUNK_FACTOR);
}
#[test]
fn test_crc() {
run_test(DtStreamType::Crc,
GeneratorCrc::BASE_SIZE,
GeneratorCrc::CHUNK_FACTOR);
}
}